1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 17:45:03 +09:00

Update treebuilder to restore tree

This commit is contained in:
mcrakhman 2024-08-15 12:46:20 +02:00
parent 2ba07d2b11
commit ccb2f5c7d4
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
4 changed files with 95 additions and 14 deletions

View file

@ -82,8 +82,11 @@ func (tb *treeBuilder) build(heads []string, theirHeads []string, newChanges []*
// but then we have to be sure that invariant stays true
oldBreakpoint, err := tb.findBreakpoint(heads, true)
if err != nil {
// this should never error out, because otherwise we have broken data
return nil, fmt.Errorf("findBreakpoint error: %v", err)
log.Error("findBreakpoint error", zap.Error(err), zap.String("treeId", tb.treeStorage.Id()))
heads, oldBreakpoint, err = tb.restoreTree()
if err != nil {
return nil, fmt.Errorf("restoreTree error: %v", err)
}
}
if len(theirHeads) > 0 {
@ -188,6 +191,38 @@ func (tb *treeBuilder) loadChange(id string) (ch *Change, err error) {
return ch, nil
}
func (tb *treeBuilder) restoreTree() (newHeads []string, breakpoint string, err error) {
allIds, err := tb.treeStorage.GetAllChangeIds()
if err != nil {
return
}
rootCh, err := tb.loadChange(tb.treeStorage.Id())
if err != nil {
return
}
tr := &Tree{}
tr.AddFast(rootCh)
var changes []*Change
for _, id := range allIds {
if id == tb.treeStorage.Id() {
continue
}
ch, e := tb.loadChange(id)
if e != nil {
continue
}
changes = append(changes, ch)
}
tr.AddFast(changes...)
breakpoint, err = tb.findBreakpoint(tr.headIds, false)
if err != nil {
return
}
newHeads = tr.headIds
err = tb.treeStorage.SetHeads(newHeads)
return
}
func (tb *treeBuilder) findBreakpoint(heads []string, noError bool) (breakpoint string, err error) {
var (
ch *Change

View file

@ -16,7 +16,17 @@ type InMemoryTreeStorage struct {
Changes map[string]*treechangeproto.RawTreeChangeWithId
addErr error
sync.RWMutex
sync.Mutex
}
func (t *InMemoryTreeStorage) GetAllChangeIds() (chs []string, err error) {
t.Lock()
defer t.Unlock()
chs = make([]string, 0, len(t.Changes))
for id := range t.Changes {
chs = append(chs, id)
}
return
}
func (t *InMemoryTreeStorage) GetAppendRawChange(ctx context.Context, buf []byte, id string) (*treechangeproto.RawTreeChangeWithId, error) {
@ -28,8 +38,8 @@ func (t *InMemoryTreeStorage) SetReturnErrorOnAdd(err error) {
}
func (t *InMemoryTreeStorage) AddRawChangesSetHeads(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error {
t.RLock()
defer t.RUnlock()
t.Lock()
defer t.Unlock()
if t.addErr != nil {
return t.addErr
}
@ -56,7 +66,6 @@ func NewInMemoryTreeStorage(
root: root,
heads: append([]string(nil), heads...),
Changes: allChanges,
RWMutex: sync.RWMutex{},
}, nil
}
@ -66,20 +75,20 @@ func (t *InMemoryTreeStorage) HasChange(ctx context.Context, id string) (bool, e
}
func (t *InMemoryTreeStorage) Id() string {
t.RLock()
defer t.RUnlock()
t.Lock()
defer t.Unlock()
return t.id
}
func (t *InMemoryTreeStorage) Root() (*treechangeproto.RawTreeChangeWithId, error) {
t.RLock()
defer t.RUnlock()
t.Lock()
defer t.Unlock()
return t.root, nil
}
func (t *InMemoryTreeStorage) Heads() ([]string, error) {
t.RLock()
defer t.RUnlock()
t.Lock()
defer t.Unlock()
return t.heads, nil
}
@ -107,8 +116,8 @@ func (t *InMemoryTreeStorage) AddRawChange(change *treechangeproto.RawTreeChange
}
func (t *InMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string) (*treechangeproto.RawTreeChangeWithId, error) {
t.RLock()
defer t.RUnlock()
t.Lock()
defer t.Unlock()
if res, exists := t.Changes[changeId]; exists {
return res, nil
}
@ -116,6 +125,8 @@ func (t *InMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string)
}
func (t *InMemoryTreeStorage) Delete() error {
t.Lock()
defer t.Unlock()
t.root = nil
t.Changes = nil
t.heads = nil
@ -123,6 +134,8 @@ func (t *InMemoryTreeStorage) Delete() error {
}
func (t *InMemoryTreeStorage) Copy() *InMemoryTreeStorage {
t.Lock()
defer t.Unlock()
var changes []*treechangeproto.RawTreeChangeWithId
for _, ch := range t.Changes {
changes = append(changes, ch)
@ -132,6 +145,8 @@ func (t *InMemoryTreeStorage) Copy() *InMemoryTreeStorage {
}
func (t *InMemoryTreeStorage) Equal(other *InMemoryTreeStorage) bool {
t.Lock()
defer t.Unlock()
if !slice.UnsortedEquals(t.heads, other.heads) {
return false
}

View file

@ -82,6 +82,36 @@ func (mr *MockTreeStorageMockRecorder) Delete() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockTreeStorage)(nil).Delete))
}
// GetAllChangeIds mocks base method.
func (m *MockTreeStorage) GetAllChangeIds() ([]string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetAllChangeIds")
ret0, _ := ret[0].([]string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetAllChangeIds indicates an expected call of GetAllChangeIds.
func (mr *MockTreeStorageMockRecorder) GetAllChangeIds() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllChangeIds", reflect.TypeOf((*MockTreeStorage)(nil).GetAllChangeIds))
}
// GetAppendRawChange mocks base method.
func (m *MockTreeStorage) GetAppendRawChange(arg0 context.Context, arg1 []byte, arg2 string) (*treechangeproto.RawTreeChangeWithId, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetAppendRawChange", arg0, arg1, arg2)
ret0, _ := ret[0].(*treechangeproto.RawTreeChangeWithId)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetAppendRawChange indicates an expected call of GetAppendRawChange.
func (mr *MockTreeStorageMockRecorder) GetAppendRawChange(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAppendRawChange", reflect.TypeOf((*MockTreeStorage)(nil).GetAppendRawChange), arg0, arg1, arg2)
}
// GetRawChange mocks base method.
func (m *MockTreeStorage) GetRawChange(arg0 context.Context, arg1 string) (*treechangeproto.RawTreeChangeWithId, error) {
m.ctrl.T.Helper()

View file

@ -33,6 +33,7 @@ type TreeStorage interface {
SetHeads(heads []string) error
AddRawChange(change *treechangeproto.RawTreeChangeWithId) error
AddRawChangesSetHeads(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error
GetAllChangeIds() ([]string, error)
GetRawChange(ctx context.Context, id string) (*treechangeproto.RawTreeChangeWithId, error)
GetAppendRawChange(ctx context.Context, buf []byte, id string) (*treechangeproto.RawTreeChangeWithId, error)