diff --git a/commonspace/object/tree/objecttree/change.go b/commonspace/object/tree/objecttree/change.go index 48004794..d02753a7 100644 --- a/commonspace/object/tree/objecttree/change.go +++ b/commonspace/object/tree/objecttree/change.go @@ -30,6 +30,7 @@ type Change struct { DataType string IsSnapshot bool IsDerived bool + IsNew bool // iterator helpers visited bool diff --git a/commonspace/object/tree/objecttree/flusher.go b/commonspace/object/tree/objecttree/flusher.go new file mode 100644 index 00000000..e169a791 --- /dev/null +++ b/commonspace/object/tree/objecttree/flusher.go @@ -0,0 +1,47 @@ +package objecttree + +type Flusher interface { + MarkNewChange(ch *Change) + FlushAfterBuild(t *objectTree) error + Flush(t *objectTree) error +} + +type defaultFlusher struct { +} + +func (d *defaultFlusher) MarkNewChange(ch *Change) { +} + +func (d *defaultFlusher) FlushAfterBuild(t *objectTree) error { + t.tree.reduceTree() + return nil +} + +func (d *defaultFlusher) Flush(t *objectTree) error { + return nil +} + +func MarkNewChangeFlusher() Flusher { + return &newChangeFlusher{} +} + +type newChangeFlusher struct { + newChanges []*Change +} + +func (n *newChangeFlusher) MarkNewChange(ch *Change) { + ch.IsNew = true + n.newChanges = append(n.newChanges, ch) +} + +func (n *newChangeFlusher) FlushAfterBuild(t *objectTree) error { + return nil +} + +func (n *newChangeFlusher) Flush(t *objectTree) error { + for _, ch := range n.newChanges { + ch.IsNew = false + } + t.tree.reduceTree() + return nil +} diff --git a/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go b/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go index 85736c6f..aea9b3a0 100644 --- a/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go +++ b/commonspace/object/tree/objecttree/mock_objecttree/mock_objecttree.go @@ -73,6 +73,21 @@ func (mr *MockObjectTreeMockRecorder) AddContent(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddContent", reflect.TypeOf((*MockObjectTree)(nil).AddContent), arg0, arg1) } +// AddContentWithValidator mocks base method. +func (m *MockObjectTree) AddContentWithValidator(arg0 context.Context, arg1 objecttree.SignableChangeContent, arg2 func(*treechangeproto.RawTreeChangeWithId) error) (objecttree.AddResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddContentWithValidator", arg0, arg1, arg2) + ret0, _ := ret[0].(objecttree.AddResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AddContentWithValidator indicates an expected call of AddContentWithValidator. +func (mr *MockObjectTreeMockRecorder) AddContentWithValidator(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddContentWithValidator", reflect.TypeOf((*MockObjectTree)(nil).AddContentWithValidator), arg0, arg1, arg2) +} + // AddRawChanges mocks base method. func (m *MockObjectTree) AddRawChanges(arg0 context.Context, arg1 objecttree.RawChangesPayload) (objecttree.AddResult, error) { m.ctrl.T.Helper() @@ -175,6 +190,20 @@ func (mr *MockObjectTreeMockRecorder) Delete() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockObjectTree)(nil).Delete)) } +// Flush mocks base method. +func (m *MockObjectTree) Flush() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Flush") + ret0, _ := ret[0].(error) + return ret0 +} + +// Flush indicates an expected call of Flush. +func (mr *MockObjectTreeMockRecorder) Flush() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockObjectTree)(nil).Flush)) +} + // GetChange mocks base method. func (m *MockObjectTree) GetChange(arg0 string) (*objecttree.Change, error) { m.ctrl.T.Helper() @@ -347,6 +376,18 @@ func (mr *MockObjectTreeMockRecorder) Root() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Root", reflect.TypeOf((*MockObjectTree)(nil).Root)) } +// SetFlusher mocks base method. +func (m *MockObjectTree) SetFlusher(arg0 objecttree.Flusher) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetFlusher", arg0) +} + +// SetFlusher indicates an expected call of SetFlusher. +func (mr *MockObjectTreeMockRecorder) SetFlusher(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetFlusher", reflect.TypeOf((*MockObjectTree)(nil).SetFlusher), arg0) +} + // SnapshotPath mocks base method. func (m *MockObjectTree) SnapshotPath() []string { m.ctrl.T.Helper() diff --git a/commonspace/object/tree/objecttree/objecttree.go b/commonspace/object/tree/objecttree/objecttree.go index 6533b2bf..ce19326a 100644 --- a/commonspace/object/tree/objecttree/objecttree.go +++ b/commonspace/object/tree/objecttree/objecttree.go @@ -84,6 +84,7 @@ type ObjectTree interface { Storage() treestorage.TreeStorage AddContent(ctx context.Context, content SignableChangeContent) (AddResult, error) + AddContentWithValidator(ctx context.Context, content SignableChangeContent, validate func(change *treechangeproto.RawTreeChangeWithId) error) (AddResult, error) AddRawChanges(ctx context.Context, changes RawChangesPayload) (AddResult, error) UnpackChange(raw *treechangeproto.RawTreeChangeWithId) (data []byte, err error) @@ -91,6 +92,8 @@ type ObjectTree interface { Delete() error Close() error + SetFlusher(flusher Flusher) + Flush() error TryClose(objectTTL time.Duration) (bool, error) } @@ -101,6 +104,7 @@ type objectTree struct { rawChangeLoader *rawChangeLoader treeBuilder *treeBuilder aclList list.AclList + flusher Flusher id string rawRoot *treechangeproto.RawTreeChangeWithId @@ -172,6 +176,10 @@ func (ot *objectTree) Header() *treechangeproto.RawTreeChangeWithId { return ot.rawRoot } +func (ot *objectTree) SetFlusher(flusher Flusher) { + ot.flusher = flusher +} + func (ot *objectTree) UnmarshalledHeader() *Change { return ot.root } @@ -203,6 +211,10 @@ func (ot *objectTree) logUseWhenUnlocked() { } func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeContent) (res AddResult, err error) { + return ot.AddContentWithValidator(ctx, content, nil) +} + +func (ot *objectTree) AddContentWithValidator(ctx context.Context, content SignableChangeContent, validator func(change *treechangeproto.RawTreeChangeWithId) error) (res AddResult, err error) { if ot.isDeleted { err = ErrDeleted return @@ -222,6 +234,14 @@ func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeCont // clearing tree, because we already saved everything in the last snapshot ot.tree = &Tree{} } + + if validator != nil { + err = validator(rawChange) + if err != nil { + return + } + } + err = ot.tree.AddMergedHead(objChange) if err != nil { panic(err) @@ -333,7 +353,10 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang } // reducing tree if we have new roots - ot.tree.reduceTree() + err = ot.flusher.FlushAfterBuild(ot) + if err != nil { + return + } // that means that we removed the ids while reducing if _, exists := ot.tree.attached[lastHeadId]; !exists { @@ -505,6 +528,13 @@ func (ot *objectTree) addRawChanges(ctx context.Context, changesPayload RawChang } } +func (ot *objectTree) Flush() error { + if ot.isDeleted { + return ErrDeleted + } + return ot.flusher.Flush(ot) +} + func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, treeChangesAdded []*Change, rawChanges []*treechangeproto.RawTreeChangeWithId) (addResult AddResult, err error) { headsCopy := func() []string { newHeads := make([]string, 0, len(ot.tree.Heads())) @@ -518,7 +548,8 @@ func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, treeChangesA getAddedChanges := func() (added []*treechangeproto.RawTreeChangeWithId, err error) { for _, idx := range ot.notSeenIdxBuf { rawChange := rawChanges[idx] - if _, exists := ot.tree.attached[rawChange.Id]; exists { + if ch, exists := ot.tree.attached[rawChange.Id]; exists { + ot.flusher.MarkNewChange(ch) added = append(added, rawChange) } } diff --git a/commonspace/object/tree/objecttree/objecttree_test.go b/commonspace/object/tree/objecttree/objecttree_test.go index e8d8d274..13b2cd4e 100644 --- a/commonspace/object/tree/objecttree/objecttree_test.go +++ b/commonspace/object/tree/objecttree/objecttree_test.go @@ -132,6 +132,7 @@ func genBuildFilterableTestableTree(filterFunc func(ch *Change) bool) func(treeS rawChangeLoader: loader, validator: &noOpTreeValidator{filterFunc: filterFunc}, aclList: aclList, + flusher: &defaultFlusher{}, } return buildObjectTree(deps) @@ -796,6 +797,76 @@ func TestObjectTree(t *testing.T) { } }) + t.Run("add new snapshot simple with newChangeFlusher", func(t *testing.T) { + ctx := prepareTreeContext(t, aclList) + treeStorage := ctx.treeStorage + changeCreator := ctx.changeCreator + objTree := ctx.objTree.(*objectTree) + objTree.flusher = &newChangeFlusher{} + + rawChanges := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRaw("1", aclList.Head().Id, "0", false, "0"), + changeCreator.CreateRaw("2", aclList.Head().Id, "0", false, "1"), + changeCreator.CreateRaw("3", aclList.Head().Id, "0", true, "2"), + changeCreator.CreateRaw("4", aclList.Head().Id, "3", false, "3"), + } + payload := RawChangesPayload{ + NewHeads: []string{rawChanges[len(rawChanges)-1].Id}, + RawChanges: rawChanges, + } + + res, err := objTree.AddRawChanges(context.Background(), payload) + require.NoError(t, err, "adding changes should be without error") + + // check result + assert.Equal(t, []string{"0"}, res.OldHeads) + assert.Equal(t, []string{"4"}, res.Heads) + assert.Equal(t, len(rawChanges), len(res.Added)) + assert.Equal(t, Append, res.Mode) + + // check tree heads + assert.Equal(t, []string{"4"}, objTree.Heads()) + + // check tree iterate + var iterChangesId []string + err = objTree.IterateRoot(nil, func(change *Change) bool { + iterChangesId = append(iterChangesId, change.Id) + return true + }) + require.NoError(t, err, "iterate should be without error") + assert.Equal(t, []string{"0", "1", "2", "3", "4"}, iterChangesId) + // before Flush + assert.Equal(t, "0", objTree.Root().Id) + + // check storage + heads, _ := treeStorage.Heads() + assert.Equal(t, []string{"4"}, heads) + + for _, ch := range rawChanges { + treeCh, err := objTree.GetChange(ch.Id) + require.NoError(t, err) + require.True(t, treeCh.IsNew) + raw, err := treeStorage.GetRawChange(context.Background(), ch.Id) + assert.NoError(t, err, "storage should have all the changes") + assert.Equal(t, ch, raw, "the changes in the storage should be the same") + } + + err = objTree.Flush() + require.NoError(t, err) + + // after Flush + assert.Equal(t, "3", objTree.Root().Id) + for _, ch := range rawChanges { + treeCh, err := objTree.GetChange(ch.Id) + if ch.Id == "3" || ch.Id == "4" { + require.NoError(t, err) + require.False(t, treeCh.IsNew) + continue + } + require.Error(t, err) + } + }) + t.Run("snapshot path", func(t *testing.T) { ctx := prepareTreeContext(t, aclList) changeCreator := ctx.changeCreator diff --git a/commonspace/object/tree/objecttree/objecttreefactory.go b/commonspace/object/tree/objecttree/objecttreefactory.go index 63b18a86..545bd609 100644 --- a/commonspace/object/tree/objecttree/objecttreefactory.go +++ b/commonspace/object/tree/objecttree/objecttreefactory.go @@ -38,6 +38,7 @@ type objectTreeDeps struct { validator ObjectTreeValidator rawChangeLoader *rawChangeLoader aclList list.AclList + flusher Flusher } type BuildObjectTreeFunc = func(treeStorage treestorage.TreeStorage, aclList list.AclList) (ObjectTree, error) @@ -58,6 +59,7 @@ func verifiableTreeDeps( validator: newTreeValidator(false, false), rawChangeLoader: rawLoader, aclList: aclList, + flusher: &defaultFlusher{}, } } @@ -75,6 +77,7 @@ func emptyDataTreeDeps( validator: newTreeValidator(false, false), rawChangeLoader: loader, aclList: aclList, + flusher: &defaultFlusher{}, } } @@ -92,6 +95,7 @@ func nonVerifiableTreeDeps( validator: &noOpTreeValidator{}, rawChangeLoader: loader, aclList: aclList, + flusher: &defaultFlusher{}, } } @@ -117,6 +121,7 @@ func BuildTestableTree(treeStorage treestorage.TreeStorage, aclList list.AclList rawChangeLoader: loader, validator: &noOpTreeValidator{}, aclList: aclList, + flusher: &defaultFlusher{}, } return buildObjectTree(deps) @@ -135,6 +140,7 @@ func BuildEmptyDataTestableTree(treeStorage treestorage.TreeStorage, aclList lis rawChangeLoader: loader, validator: &noOpTreeValidator{}, aclList: aclList, + flusher: &defaultFlusher{}, } return buildObjectTree(deps) @@ -223,6 +229,7 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10), notSeenIdxBuf: make([]int, 0, 10), newSnapshotsBuf: make([]*Change, 0, 10), + flusher: deps.flusher, } err := objTree.rebuildFromStorage(nil, nil) @@ -259,6 +266,7 @@ func buildHistoryTree(deps objectTreeDeps, params HistoryTreeParams) (ht History difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10), notSeenIdxBuf: make([]int, 0, 10), newSnapshotsBuf: make([]*Change, 0, 10), + flusher: deps.flusher, } hTree := &historyTree{objectTree: objTree} diff --git a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go index 645a8f8b..fee2c652 100644 --- a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go +++ b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go @@ -82,6 +82,21 @@ func (mr *MockSyncTreeMockRecorder) AddContent(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddContent", reflect.TypeOf((*MockSyncTree)(nil).AddContent), arg0, arg1) } +// AddContentWithValidator mocks base method. +func (m *MockSyncTree) AddContentWithValidator(arg0 context.Context, arg1 objecttree.SignableChangeContent, arg2 func(*treechangeproto.RawTreeChangeWithId) error) (objecttree.AddResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddContentWithValidator", arg0, arg1, arg2) + ret0, _ := ret[0].(objecttree.AddResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AddContentWithValidator indicates an expected call of AddContentWithValidator. +func (mr *MockSyncTreeMockRecorder) AddContentWithValidator(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddContentWithValidator", reflect.TypeOf((*MockSyncTree)(nil).AddContentWithValidator), arg0, arg1, arg2) +} + // AddRawChanges mocks base method. func (m *MockSyncTree) AddRawChanges(arg0 context.Context, arg1 objecttree.RawChangesPayload) (objecttree.AddResult, error) { m.ctrl.T.Helper() @@ -199,6 +214,20 @@ func (mr *MockSyncTreeMockRecorder) Delete() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockSyncTree)(nil).Delete)) } +// Flush mocks base method. +func (m *MockSyncTree) Flush() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Flush") + ret0, _ := ret[0].(error) + return ret0 +} + +// Flush indicates an expected call of Flush. +func (mr *MockSyncTreeMockRecorder) Flush() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockSyncTree)(nil).Flush)) +} + // GetChange mocks base method. func (m *MockSyncTree) GetChange(arg0 string) (*objecttree.Change, error) { m.ctrl.T.Helper() @@ -444,6 +473,18 @@ func (mr *MockSyncTreeMockRecorder) Root() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Root", reflect.TypeOf((*MockSyncTree)(nil).Root)) } +// SetFlusher mocks base method. +func (m *MockSyncTree) SetFlusher(arg0 objecttree.Flusher) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetFlusher", arg0) +} + +// SetFlusher indicates an expected call of SetFlusher. +func (mr *MockSyncTreeMockRecorder) SetFlusher(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetFlusher", reflect.TypeOf((*MockSyncTree)(nil).SetFlusher), arg0) +} + // SetListener mocks base method. func (m *MockSyncTree) SetListener(arg0 updatelistener.UpdateListener) { m.ctrl.T.Helper() diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 015fd317..9929b559 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -13,6 +13,7 @@ import ( "github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/sync/syncdeps" @@ -166,10 +167,14 @@ func (s *syncTree) IterateRoot(convert objecttree.ChangeConvertFunc, iterate obj } func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableChangeContent) (res objecttree.AddResult, err error) { + return s.AddContentWithValidator(ctx, content, nil) +} + +func (s *syncTree) AddContentWithValidator(ctx context.Context, content objecttree.SignableChangeContent, validate func(change *treechangeproto.RawTreeChangeWithId) error) (res objecttree.AddResult, err error) { if err = s.checkAlive(); err != nil { return } - res, err = s.ObjectTree.AddContent(ctx, content) + res, err = s.ObjectTree.AddContentWithValidator(ctx, content, validate) if err != nil { return } @@ -247,6 +252,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree. s.listener.Rebuild(s) } } + s.flush() if res.Mode != objecttree.Nothing { if s.notifiable != nil { s.notifiable.UpdateHeads(s.Id(), res.Heads) @@ -332,8 +338,16 @@ func (s *syncTree) SyncWithPeer(ctx context.Context, p peer.Peer) (err error) { func (s *syncTree) afterBuild() { if s.listener != nil { s.listener.Rebuild(s) + s.flush() } if s.notifiable != nil { s.notifiable.UpdateHeads(s.Id(), s.Heads()) } } + +func (s *syncTree) flush() { + err := s.Flush() + if err != nil { + log.Warn("flush error", zap.Error(err)) + } +} diff --git a/coordinator/coordinatorclient/mock_coordinatorclient/mock_coordinatorclient.go b/coordinator/coordinatorclient/mock_coordinatorclient/mock_coordinatorclient.go index 92166721..83fd8ed8 100644 --- a/coordinator/coordinatorclient/mock_coordinatorclient/mock_coordinatorclient.go +++ b/coordinator/coordinatorclient/mock_coordinatorclient/mock_coordinatorclient.go @@ -5,6 +5,7 @@ // // mockgen -destination mock_coordinatorclient/mock_coordinatorclient.go github.com/anyproto/any-sync/coordinator/coordinatorclient CoordinatorClient // + // Package mock_coordinatorclient is a generated GoMock package. package mock_coordinatorclient