1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Merge remote-tracking branch 'origin/add-content-with-validator' into GO-3996-add-tree-transactions

# Conflicts:
#	commonspace/object/tree/objecttree/change.go
#	commonspace/object/tree/objecttree/objecttree.go
#	commonspace/object/tree/synctree/synctree_test.go
This commit is contained in:
mcrakhman 2024-08-29 16:17:50 +02:00
commit 3b2acf8021
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
9 changed files with 258 additions and 3 deletions

View file

@ -30,6 +30,7 @@ type Change struct {
DataType string
IsSnapshot bool
IsDerived bool
IsNew bool
// iterator helpers
visited bool

View file

@ -0,0 +1,47 @@
package objecttree
type Flusher interface {
MarkNewChange(ch *Change)
FlushAfterBuild(t *objectTree) error
Flush(t *objectTree) error
}
type defaultFlusher struct {
}
func (d *defaultFlusher) MarkNewChange(ch *Change) {
}
func (d *defaultFlusher) FlushAfterBuild(t *objectTree) error {
t.tree.reduceTree()
return nil
}
func (d *defaultFlusher) Flush(t *objectTree) error {
return nil
}
func MarkNewChangeFlusher() Flusher {
return &newChangeFlusher{}
}
type newChangeFlusher struct {
newChanges []*Change
}
func (n *newChangeFlusher) MarkNewChange(ch *Change) {
ch.IsNew = true
n.newChanges = append(n.newChanges, ch)
}
func (n *newChangeFlusher) FlushAfterBuild(t *objectTree) error {
return nil
}
func (n *newChangeFlusher) Flush(t *objectTree) error {
for _, ch := range n.newChanges {
ch.IsNew = false
}
t.tree.reduceTree()
return nil
}

View file

@ -73,6 +73,21 @@ func (mr *MockObjectTreeMockRecorder) AddContent(arg0, arg1 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddContent", reflect.TypeOf((*MockObjectTree)(nil).AddContent), arg0, arg1)
}
// AddContentWithValidator mocks base method.
func (m *MockObjectTree) AddContentWithValidator(arg0 context.Context, arg1 objecttree.SignableChangeContent, arg2 func(*treechangeproto.RawTreeChangeWithId) error) (objecttree.AddResult, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddContentWithValidator", arg0, arg1, arg2)
ret0, _ := ret[0].(objecttree.AddResult)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// AddContentWithValidator indicates an expected call of AddContentWithValidator.
func (mr *MockObjectTreeMockRecorder) AddContentWithValidator(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddContentWithValidator", reflect.TypeOf((*MockObjectTree)(nil).AddContentWithValidator), arg0, arg1, arg2)
}
// AddRawChanges mocks base method.
func (m *MockObjectTree) AddRawChanges(arg0 context.Context, arg1 objecttree.RawChangesPayload) (objecttree.AddResult, error) {
m.ctrl.T.Helper()
@ -175,6 +190,20 @@ func (mr *MockObjectTreeMockRecorder) Delete() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockObjectTree)(nil).Delete))
}
// Flush mocks base method.
func (m *MockObjectTree) Flush() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Flush")
ret0, _ := ret[0].(error)
return ret0
}
// Flush indicates an expected call of Flush.
func (mr *MockObjectTreeMockRecorder) Flush() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockObjectTree)(nil).Flush))
}
// GetChange mocks base method.
func (m *MockObjectTree) GetChange(arg0 string) (*objecttree.Change, error) {
m.ctrl.T.Helper()
@ -347,6 +376,18 @@ func (mr *MockObjectTreeMockRecorder) Root() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Root", reflect.TypeOf((*MockObjectTree)(nil).Root))
}
// SetFlusher mocks base method.
func (m *MockObjectTree) SetFlusher(arg0 objecttree.Flusher) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetFlusher", arg0)
}
// SetFlusher indicates an expected call of SetFlusher.
func (mr *MockObjectTreeMockRecorder) SetFlusher(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetFlusher", reflect.TypeOf((*MockObjectTree)(nil).SetFlusher), arg0)
}
// SnapshotPath mocks base method.
func (m *MockObjectTree) SnapshotPath() []string {
m.ctrl.T.Helper()

View file

@ -84,6 +84,7 @@ type ObjectTree interface {
Storage() treestorage.TreeStorage
AddContent(ctx context.Context, content SignableChangeContent) (AddResult, error)
AddContentWithValidator(ctx context.Context, content SignableChangeContent, validate func(change *treechangeproto.RawTreeChangeWithId) error) (AddResult, error)
AddRawChanges(ctx context.Context, changes RawChangesPayload) (AddResult, error)
UnpackChange(raw *treechangeproto.RawTreeChangeWithId) (data []byte, err error)
@ -91,6 +92,8 @@ type ObjectTree interface {
Delete() error
Close() error
SetFlusher(flusher Flusher)
Flush() error
TryClose(objectTTL time.Duration) (bool, error)
}
@ -101,6 +104,7 @@ type objectTree struct {
rawChangeLoader *rawChangeLoader
treeBuilder *treeBuilder
aclList list.AclList
flusher Flusher
id string
rawRoot *treechangeproto.RawTreeChangeWithId
@ -172,6 +176,10 @@ func (ot *objectTree) Header() *treechangeproto.RawTreeChangeWithId {
return ot.rawRoot
}
func (ot *objectTree) SetFlusher(flusher Flusher) {
ot.flusher = flusher
}
func (ot *objectTree) UnmarshalledHeader() *Change {
return ot.root
}
@ -203,6 +211,10 @@ func (ot *objectTree) logUseWhenUnlocked() {
}
func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeContent) (res AddResult, err error) {
return ot.AddContentWithValidator(ctx, content, nil)
}
func (ot *objectTree) AddContentWithValidator(ctx context.Context, content SignableChangeContent, validator func(change *treechangeproto.RawTreeChangeWithId) error) (res AddResult, err error) {
if ot.isDeleted {
err = ErrDeleted
return
@ -222,6 +234,14 @@ func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeCont
// clearing tree, because we already saved everything in the last snapshot
ot.tree = &Tree{}
}
if validator != nil {
err = validator(rawChange)
if err != nil {
return
}
}
err = ot.tree.AddMergedHead(objChange)
if err != nil {
panic(err)
@ -333,7 +353,10 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang
}
// reducing tree if we have new roots
ot.tree.reduceTree()
err = ot.flusher.FlushAfterBuild(ot)
if err != nil {
return
}
// that means that we removed the ids while reducing
if _, exists := ot.tree.attached[lastHeadId]; !exists {
@ -505,6 +528,13 @@ func (ot *objectTree) addRawChanges(ctx context.Context, changesPayload RawChang
}
}
func (ot *objectTree) Flush() error {
if ot.isDeleted {
return ErrDeleted
}
return ot.flusher.Flush(ot)
}
func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, treeChangesAdded []*Change, rawChanges []*treechangeproto.RawTreeChangeWithId) (addResult AddResult, err error) {
headsCopy := func() []string {
newHeads := make([]string, 0, len(ot.tree.Heads()))
@ -518,7 +548,8 @@ func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, treeChangesA
getAddedChanges := func() (added []*treechangeproto.RawTreeChangeWithId, err error) {
for _, idx := range ot.notSeenIdxBuf {
rawChange := rawChanges[idx]
if _, exists := ot.tree.attached[rawChange.Id]; exists {
if ch, exists := ot.tree.attached[rawChange.Id]; exists {
ot.flusher.MarkNewChange(ch)
added = append(added, rawChange)
}
}

View file

@ -132,6 +132,7 @@ func genBuildFilterableTestableTree(filterFunc func(ch *Change) bool) func(treeS
rawChangeLoader: loader,
validator: &noOpTreeValidator{filterFunc: filterFunc},
aclList: aclList,
flusher: &defaultFlusher{},
}
return buildObjectTree(deps)
@ -796,6 +797,76 @@ func TestObjectTree(t *testing.T) {
}
})
t.Run("add new snapshot simple with newChangeFlusher", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList)
treeStorage := ctx.treeStorage
changeCreator := ctx.changeCreator
objTree := ctx.objTree.(*objectTree)
objTree.flusher = &newChangeFlusher{}
rawChanges := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("1", aclList.Head().Id, "0", false, "0"),
changeCreator.CreateRaw("2", aclList.Head().Id, "0", false, "1"),
changeCreator.CreateRaw("3", aclList.Head().Id, "0", true, "2"),
changeCreator.CreateRaw("4", aclList.Head().Id, "3", false, "3"),
}
payload := RawChangesPayload{
NewHeads: []string{rawChanges[len(rawChanges)-1].Id},
RawChanges: rawChanges,
}
res, err := objTree.AddRawChanges(context.Background(), payload)
require.NoError(t, err, "adding changes should be without error")
// check result
assert.Equal(t, []string{"0"}, res.OldHeads)
assert.Equal(t, []string{"4"}, res.Heads)
assert.Equal(t, len(rawChanges), len(res.Added))
assert.Equal(t, Append, res.Mode)
// check tree heads
assert.Equal(t, []string{"4"}, objTree.Heads())
// check tree iterate
var iterChangesId []string
err = objTree.IterateRoot(nil, func(change *Change) bool {
iterChangesId = append(iterChangesId, change.Id)
return true
})
require.NoError(t, err, "iterate should be without error")
assert.Equal(t, []string{"0", "1", "2", "3", "4"}, iterChangesId)
// before Flush
assert.Equal(t, "0", objTree.Root().Id)
// check storage
heads, _ := treeStorage.Heads()
assert.Equal(t, []string{"4"}, heads)
for _, ch := range rawChanges {
treeCh, err := objTree.GetChange(ch.Id)
require.NoError(t, err)
require.True(t, treeCh.IsNew)
raw, err := treeStorage.GetRawChange(context.Background(), ch.Id)
assert.NoError(t, err, "storage should have all the changes")
assert.Equal(t, ch, raw, "the changes in the storage should be the same")
}
err = objTree.Flush()
require.NoError(t, err)
// after Flush
assert.Equal(t, "3", objTree.Root().Id)
for _, ch := range rawChanges {
treeCh, err := objTree.GetChange(ch.Id)
if ch.Id == "3" || ch.Id == "4" {
require.NoError(t, err)
require.False(t, treeCh.IsNew)
continue
}
require.Error(t, err)
}
})
t.Run("snapshot path", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList)
changeCreator := ctx.changeCreator

View file

@ -38,6 +38,7 @@ type objectTreeDeps struct {
validator ObjectTreeValidator
rawChangeLoader *rawChangeLoader
aclList list.AclList
flusher Flusher
}
type BuildObjectTreeFunc = func(treeStorage treestorage.TreeStorage, aclList list.AclList) (ObjectTree, error)
@ -58,6 +59,7 @@ func verifiableTreeDeps(
validator: newTreeValidator(false, false),
rawChangeLoader: rawLoader,
aclList: aclList,
flusher: &defaultFlusher{},
}
}
@ -75,6 +77,7 @@ func emptyDataTreeDeps(
validator: newTreeValidator(false, false),
rawChangeLoader: loader,
aclList: aclList,
flusher: &defaultFlusher{},
}
}
@ -92,6 +95,7 @@ func nonVerifiableTreeDeps(
validator: &noOpTreeValidator{},
rawChangeLoader: loader,
aclList: aclList,
flusher: &defaultFlusher{},
}
}
@ -117,6 +121,7 @@ func BuildTestableTree(treeStorage treestorage.TreeStorage, aclList list.AclList
rawChangeLoader: loader,
validator: &noOpTreeValidator{},
aclList: aclList,
flusher: &defaultFlusher{},
}
return buildObjectTree(deps)
@ -135,6 +140,7 @@ func BuildEmptyDataTestableTree(treeStorage treestorage.TreeStorage, aclList lis
rawChangeLoader: loader,
validator: &noOpTreeValidator{},
aclList: aclList,
flusher: &defaultFlusher{},
}
return buildObjectTree(deps)
@ -223,6 +229,7 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10),
notSeenIdxBuf: make([]int, 0, 10),
newSnapshotsBuf: make([]*Change, 0, 10),
flusher: deps.flusher,
}
err := objTree.rebuildFromStorage(nil, nil)
@ -259,6 +266,7 @@ func buildHistoryTree(deps objectTreeDeps, params HistoryTreeParams) (ht History
difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10),
notSeenIdxBuf: make([]int, 0, 10),
newSnapshotsBuf: make([]*Change, 0, 10),
flusher: deps.flusher,
}
hTree := &historyTree{objectTree: objTree}

View file

@ -82,6 +82,21 @@ func (mr *MockSyncTreeMockRecorder) AddContent(arg0, arg1 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddContent", reflect.TypeOf((*MockSyncTree)(nil).AddContent), arg0, arg1)
}
// AddContentWithValidator mocks base method.
func (m *MockSyncTree) AddContentWithValidator(arg0 context.Context, arg1 objecttree.SignableChangeContent, arg2 func(*treechangeproto.RawTreeChangeWithId) error) (objecttree.AddResult, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddContentWithValidator", arg0, arg1, arg2)
ret0, _ := ret[0].(objecttree.AddResult)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// AddContentWithValidator indicates an expected call of AddContentWithValidator.
func (mr *MockSyncTreeMockRecorder) AddContentWithValidator(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddContentWithValidator", reflect.TypeOf((*MockSyncTree)(nil).AddContentWithValidator), arg0, arg1, arg2)
}
// AddRawChanges mocks base method.
func (m *MockSyncTree) AddRawChanges(arg0 context.Context, arg1 objecttree.RawChangesPayload) (objecttree.AddResult, error) {
m.ctrl.T.Helper()
@ -199,6 +214,20 @@ func (mr *MockSyncTreeMockRecorder) Delete() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockSyncTree)(nil).Delete))
}
// Flush mocks base method.
func (m *MockSyncTree) Flush() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Flush")
ret0, _ := ret[0].(error)
return ret0
}
// Flush indicates an expected call of Flush.
func (mr *MockSyncTreeMockRecorder) Flush() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockSyncTree)(nil).Flush))
}
// GetChange mocks base method.
func (m *MockSyncTree) GetChange(arg0 string) (*objecttree.Change, error) {
m.ctrl.T.Helper()
@ -444,6 +473,18 @@ func (mr *MockSyncTreeMockRecorder) Root() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Root", reflect.TypeOf((*MockSyncTree)(nil).Root))
}
// SetFlusher mocks base method.
func (m *MockSyncTree) SetFlusher(arg0 objecttree.Flusher) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetFlusher", arg0)
}
// SetFlusher indicates an expected call of SetFlusher.
func (mr *MockSyncTreeMockRecorder) SetFlusher(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetFlusher", reflect.TypeOf((*MockSyncTree)(nil).SetFlusher), arg0)
}
// SetListener mocks base method.
func (m *MockSyncTree) SetListener(arg0 updatelistener.UpdateListener) {
m.ctrl.T.Helper()

View file

@ -13,6 +13,7 @@ import (
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
@ -166,10 +167,14 @@ func (s *syncTree) IterateRoot(convert objecttree.ChangeConvertFunc, iterate obj
}
func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableChangeContent) (res objecttree.AddResult, err error) {
return s.AddContentWithValidator(ctx, content, nil)
}
func (s *syncTree) AddContentWithValidator(ctx context.Context, content objecttree.SignableChangeContent, validate func(change *treechangeproto.RawTreeChangeWithId) error) (res objecttree.AddResult, err error) {
if err = s.checkAlive(); err != nil {
return
}
res, err = s.ObjectTree.AddContent(ctx, content)
res, err = s.ObjectTree.AddContentWithValidator(ctx, content, validate)
if err != nil {
return
}
@ -247,6 +252,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.
s.listener.Rebuild(s)
}
}
s.flush()
if res.Mode != objecttree.Nothing {
if s.notifiable != nil {
s.notifiable.UpdateHeads(s.Id(), res.Heads)
@ -332,8 +338,16 @@ func (s *syncTree) SyncWithPeer(ctx context.Context, p peer.Peer) (err error) {
func (s *syncTree) afterBuild() {
if s.listener != nil {
s.listener.Rebuild(s)
s.flush()
}
if s.notifiable != nil {
s.notifiable.UpdateHeads(s.Id(), s.Heads())
}
}
func (s *syncTree) flush() {
err := s.Flush()
if err != nil {
log.Warn("flush error", zap.Error(err))
}
}

View file

@ -5,6 +5,7 @@
//
// mockgen -destination mock_coordinatorclient/mock_coordinatorclient.go github.com/anyproto/any-sync/coordinator/coordinatorclient CoordinatorClient
//
// Package mock_coordinatorclient is a generated GoMock package.
package mock_coordinatorclient