1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Merge pull request #337 from anyproto/GO-4433-optimize-tree-reduce

GO-4433: Optimize tree reduce/validation
This commit is contained in:
Mikhail Rakhmanov 2024-11-05 17:14:09 +01:00 committed by GitHub
commit b82af41535
Signed by: github
GPG key ID: B5690EEEBB952194
12 changed files with 221 additions and 70 deletions

View file

@ -18,6 +18,7 @@ var (
type Change struct {
Next []*Change
PreviousIds []string
Previous []*Change
AclHeadId string
Id string
SnapshotId string

View file

@ -57,7 +57,9 @@ func (l *loadIterator) NextBatch(maxSize int) (batch IteratorBatch, err error) {
batch.Heads = slice.DiscardFromSlice(batch.Heads, func(s string) bool {
return slices.Contains(c.PreviousIds, s)
})
batch.Heads = append(batch.Heads, c.Id)
if !slices.Contains(batch.Heads, c.Id) {
batch.Heads = append(batch.Heads, c.Id)
}
return true
}
if curSize+rawEntry.size >= maxSize && len(batch.Batch) != 0 {
@ -75,7 +77,9 @@ func (l *loadIterator) NextBatch(maxSize int) (batch IteratorBatch, err error) {
batch.Heads = slice.DiscardFromSlice(batch.Heads, func(s string) bool {
return slices.Contains(c.PreviousIds, s)
})
batch.Heads = append(batch.Heads, c.Id)
if !slices.Contains(batch.Heads, c.Id) {
batch.Heads = append(batch.Heads, c.Id)
}
return true
})
l.lastHeads = batch.Heads

View file

@ -323,13 +323,13 @@ func TestObjectTree(t *testing.T) {
bStore = aTree.Storage().(*treestorage.InMemoryTreeStorage).Copy()
root, _ = bStore.Root()
heads, _ := bStore.Heads()
filteredPayload, err := ValidateFilterRawTree(treestorage.TreeStorageCreatePayload{
newTree, err := ValidateFilterRawTree(treestorage.TreeStorageCreatePayload{
RootRawChange: root,
Changes: bStore.AllChanges(),
Heads: heads,
}, bAccount.Acl)
}, InMemoryStorageCreator{}, bAccount.Acl)
require.NoError(t, err)
require.Equal(t, 2, len(filteredPayload.Changes))
require.Equal(t, 2, len(newTree.Storage().(*treestorage.InMemoryTreeStorage).AllChanges()))
err = aTree.IterateRoot(func(change *Change, decrypted []byte) (any, error) {
return nil, nil
}, func(change *Change) bool {
@ -497,6 +497,7 @@ func TestObjectTree(t *testing.T) {
store, _ := treestorage.NewInMemoryTreeStorage(root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root})
oTree, err := BuildObjectTree(store, aclList)
require.NoError(t, err)
emptyDataTreeDeps = nonVerifiableTreeDeps
err = ValidateRawTree(treestorage.TreeStorageCreatePayload{
RootRawChange: oTree.Header(),
Heads: []string{root.Id},
@ -516,6 +517,7 @@ func TestObjectTree(t *testing.T) {
store, _ := treestorage.NewInMemoryTreeStorage(root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root})
oTree, err := BuildObjectTree(store, aclList)
require.NoError(t, err)
emptyDataTreeDeps = nonVerifiableTreeDeps
err = ValidateRawTree(treestorage.TreeStorageCreatePayload{
RootRawChange: oTree.Header(),
Heads: []string{root.Id},
@ -558,6 +560,7 @@ func TestObjectTree(t *testing.T) {
})
require.NoError(t, err)
allChanges := oTree.Storage().(*treestorage.InMemoryTreeStorage).AllChanges()
emptyDataTreeDeps = nonVerifiableTreeDeps
err = ValidateRawTree(treestorage.TreeStorageCreatePayload{
RootRawChange: oTree.Header(),
Heads: []string{oTree.Heads()[0]},
@ -587,6 +590,7 @@ func TestObjectTree(t *testing.T) {
}, aclList)
require.NoError(t, err)
store, _ := treestorage.NewInMemoryTreeStorage(root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root})
emptyDataTreeDeps = nonVerifiableTreeDeps
oTree, err := BuildObjectTree(store, aclList)
require.NoError(t, err)
_, err = oTree.AddContent(ctx, SignableChangeContent{
@ -629,7 +633,7 @@ func TestObjectTree(t *testing.T) {
changeCreator.CreateRaw("2", aclList.Head().Id, "0", false, "1"),
changeCreator.CreateRaw("3", aclList.Head().Id, "0", true, "2"),
}
defaultObjectTreeDeps = nonVerifiableTreeDeps
emptyDataTreeDeps = nonVerifiableTreeDeps
err := ValidateRawTree(treestorage.TreeStorageCreatePayload{
RootRawChange: ctx.objTree.Header(),
Heads: []string{"3"},
@ -1476,7 +1480,7 @@ func TestObjectTree(t *testing.T) {
changeCreator.CreateRaw("2", aclList.Head().Id, "0", false, "1"),
changeCreator.CreateRaw("3", aclList.Head().Id, "0", true, "2"),
}
defaultObjectTreeDeps = nonVerifiableTreeDeps
emptyDataTreeDeps = nonVerifiableTreeDeps
err := ValidateRawTree(treestorage.TreeStorageCreatePayload{
RootRawChange: ctx.objTree.Header(),
Heads: []string{"3"},
@ -1493,7 +1497,7 @@ func TestObjectTree(t *testing.T) {
ctx.objTree.Header(),
changeCreator.CreateRaw("3", aclList.Head().Id, "0", true, "2"),
}
defaultObjectTreeDeps = nonVerifiableTreeDeps
emptyDataTreeDeps = nonVerifiableTreeDeps
err := ValidateRawTree(treestorage.TreeStorageCreatePayload{
RootRawChange: ctx.objTree.Header(),
Heads: []string{"3"},

View file

@ -63,7 +63,9 @@ func verifiableTreeDeps(
}
}
func emptyDataTreeDeps(
var emptyDataTreeDeps = verifiableEmptyDataTreeDeps
func verifiableEmptyDataTreeDeps(
rootChange *treechangeproto.RawTreeChangeWithId,
treeStorage treestorage.TreeStorage,
aclList list.AclList) objectTreeDeps {
@ -156,6 +158,16 @@ func BuildKeyFilterableObjectTree(treeStorage treestorage.TreeStorage, aclList l
return buildObjectTree(deps)
}
func BuildEmptyDataKeyFilterableObjectTree(treeStorage treestorage.TreeStorage, aclList list.AclList) (ObjectTree, error) {
rootChange, err := treeStorage.Root()
if err != nil {
return nil, err
}
deps := emptyDataTreeDeps(rootChange, treeStorage, aclList)
deps.validator = newTreeValidator(true, true)
return buildObjectTree(deps)
}
func BuildObjectTree(treeStorage treestorage.TreeStorage, aclList list.AclList) (ObjectTree, error) {
rootChange, err := treeStorage.Root()
if err != nil {

View file

@ -9,7 +9,17 @@ import (
"github.com/anyproto/any-sync/util/slice"
)
type ValidatorFunc func(payload treestorage.TreeStorageCreatePayload, buildFunc BuildObjectTreeFunc, aclList list.AclList) (retPayload treestorage.TreeStorageCreatePayload, err error)
type TreeStorageCreator interface {
CreateTreeStorage(payload treestorage.TreeStorageCreatePayload) (treestorage.TreeStorage, error)
}
type InMemoryStorageCreator struct{}
func (i InMemoryStorageCreator) CreateTreeStorage(payload treestorage.TreeStorageCreatePayload) (treestorage.TreeStorage, error) {
return treestorage.NewInMemoryTreeStorage(payload.RootRawChange, payload.Heads, payload.Changes)
}
type ValidatorFunc func(payload treestorage.TreeStorageCreatePayload, storageCreator TreeStorageCreator, aclList list.AclList) (ret ObjectTree, err error)
type ObjectTreeValidator interface {
// ValidateFullTree should always be entered while holding a read lock on AclList
@ -160,12 +170,15 @@ func (v *objectTreeValidator) validateChange(tree *Tree, aclList list.AclList, c
return
}
func ValidateRawTreeBuildFunc(payload treestorage.TreeStorageCreatePayload, buildFunc BuildObjectTreeFunc, aclList list.AclList) (newPayload treestorage.TreeStorageCreatePayload, err error) {
treeStorage, err := treestorage.NewInMemoryTreeStorage(payload.RootRawChange, []string{payload.RootRawChange.Id}, nil)
func ValidateRawTreeDefault(payload treestorage.TreeStorageCreatePayload, storageCreator TreeStorageCreator, aclList list.AclList) (objTree ObjectTree, err error) {
treeStorage, err := storageCreator.CreateTreeStorage(treestorage.TreeStorageCreatePayload{
RootRawChange: payload.RootRawChange,
Heads: []string{payload.RootRawChange.Id},
})
if err != nil {
return
}
tree, err := buildFunc(treeStorage, aclList)
tree, err := BuildEmptyDataObjectTree(treeStorage, aclList)
if err != nil {
return
}
@ -179,33 +192,36 @@ func ValidateRawTreeBuildFunc(payload treestorage.TreeStorageCreatePayload, buil
return
}
if !slice.UnsortedEquals(res.Heads, payload.Heads) {
return payload, fmt.Errorf("heads mismatch: %v != %v, %w", res.Heads, payload.Heads, ErrHasInvalidChanges)
return nil, fmt.Errorf("heads mismatch: %v != %v, %w", res.Heads, payload.Heads, ErrHasInvalidChanges)
}
// if tree has only one change we still should check if the snapshot id is same as root
if IsEmptyDerivedTree(tree) {
return payload, ErrDerived
return nil, ErrDerived
}
return payload, nil
return tree, nil
}
func ValidateFilterRawTree(payload treestorage.TreeStorageCreatePayload, aclList list.AclList) (retPayload treestorage.TreeStorageCreatePayload, err error) {
func ValidateFilterRawTree(payload treestorage.TreeStorageCreatePayload, storageCreator TreeStorageCreator, aclList list.AclList) (objTree ObjectTree, err error) {
aclList.RLock()
if !aclList.AclState().HadReadPermissions(aclList.AclState().Identity()) {
aclList.RUnlock()
return payload, list.ErrNoReadKey
return nil, list.ErrNoReadKey
}
aclList.RUnlock()
treeStorage, err := treestorage.NewInMemoryTreeStorage(payload.RootRawChange, []string{payload.RootRawChange.Id}, nil)
treeStorage, err := storageCreator.CreateTreeStorage(treestorage.TreeStorageCreatePayload{
RootRawChange: payload.RootRawChange,
Heads: []string{payload.RootRawChange.Id},
})
if err != nil {
return
}
tree, err := BuildKeyFilterableObjectTree(treeStorage, aclList)
tree, err := BuildEmptyDataKeyFilterableObjectTree(treeStorage, aclList)
if err != nil {
return
}
tree.Lock()
defer tree.Unlock()
res, err := tree.AddRawChanges(context.Background(), RawChangesPayload{
_, err = tree.AddRawChanges(context.Background(), RawChangesPayload{
NewHeads: payload.Heads,
RawChanges: payload.Changes,
})
@ -213,16 +229,12 @@ func ValidateFilterRawTree(payload treestorage.TreeStorageCreatePayload, aclList
return
}
if IsEmptyTree(tree) {
return payload, ErrNoChangeInTree
return nil, ErrNoChangeInTree
}
return treestorage.TreeStorageCreatePayload{
RootRawChange: payload.RootRawChange,
Heads: res.Heads,
Changes: treeStorage.(*treestorage.InMemoryTreeStorage).AllChanges(),
}, nil
return tree, nil
}
func ValidateRawTree(payload treestorage.TreeStorageCreatePayload, aclList list.AclList) (err error) {
_, err = ValidateRawTreeBuildFunc(payload, BuildObjectTree, aclList)
_, err = ValidateRawTreeDefault(payload, InMemoryStorageCreator{}, aclList)
return
}

View file

@ -266,6 +266,7 @@ func (t *Tree) attach(c *Change, newEl bool) {
for _, id := range c.PreviousIds {
// prev id must already be attached if we attach this id, so we don't need to check if it exists
prev := t.attached[id]
c.Previous = append(c.Previous, prev)
// appending c to next changes of all previous changes
if len(prev.Next) == 0 || prev.Next[len(prev.Next)-1].Id <= c.Id {
prev.Next = append(prev.Next, c)
@ -341,10 +342,8 @@ func (t *Tree) dfsPrev(stack []*Change, breakpoints []string, visit func(ch *Cha
ch.visited = true
t.visitedBuf = append(t.visitedBuf, ch)
for _, prevId := range ch.PreviousIds {
prevCh, exists := t.attached[prevId]
// here the only time it wouldn't exist if we are at the tree root
if exists && !prevCh.visited {
for _, prevCh := range ch.Previous {
if !prevCh.visited {
stack = append(stack, prevCh)
}
}

View file

@ -200,6 +200,31 @@ func TestTree_CheckRootReduce(t *testing.T) {
assert.Equal(t, []string{"10", "last"}, res)
})
})
t.Run("snapshots in line", func(t *testing.T) {
tr := new(Tree)
tr.Add(
newSnapshot("0", ""),
newSnapshot("0.1", "0", "0"),
newSnapshot("0.2", "0", "0"),
newSnapshot("1", "0", "0.1", "0.2"),
newSnapshot("2", "1", "1"),
newSnapshot("3", "2", "2"),
)
t.Run("check root", func(t *testing.T) {
total := tr.checkRoot(tr.attached["3"])
assert.Equal(t, 0, total)
})
t.Run("reduce", func(t *testing.T) {
tr.reduceTree()
assert.Equal(t, "3", tr.RootId())
var res []string
tr.IterateSkip(tr.RootId(), func(c *Change) (isContinue bool) {
res = append(res, c.Id)
return true
})
assert.Equal(t, []string{"3"}, res)
})
})
t.Run("check root many", func(t *testing.T) {
tr := new(Tree)
tr.Add(

View file

@ -1,6 +1,10 @@
package objecttree
import "math"
import (
"math"
"github.com/anyproto/any-sync/util/slice"
)
// clearPossibleRoots force removes any snapshots which can further be deemed as roots
func (t *Tree) clearPossibleRoots() {
@ -70,6 +74,20 @@ func (t *Tree) reduceTree() (res bool) {
minRoot *Change
minTotal = math.MaxInt
)
for _, r := range t.possibleRoots {
// if this is snapshot and next is also snapshot, then we don't need to take this one into account
if len(r.Next) == 1 && r.Next[0].IsSnapshot {
r.visited = true
}
}
t.possibleRoots = slice.DiscardFromSlice(t.possibleRoots, func(change *Change) bool {
if change.visited {
change.visited = false
return true
}
return false
})
// TODO: this can be further optimized by iterating the tree and checking the roots from top to bottom
// checking if we can reduce tree to other root
for _, root := range t.possibleRoots {

View file

@ -3,19 +3,25 @@ package synctree
import (
"context"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/response"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
)
type fullResponseCollector struct {
heads []string
root *treechangeproto.RawTreeChangeWithId
changes []*treechangeproto.RawTreeChangeWithId
deps BuildDeps
heads []string
root *treechangeproto.RawTreeChangeWithId
changes []*treechangeproto.RawTreeChangeWithId
objectTree objecttree.ObjectTree
}
func newFullResponseCollector() *fullResponseCollector {
return &fullResponseCollector{}
func newFullResponseCollector(deps BuildDeps) *fullResponseCollector {
return &fullResponseCollector{
deps: deps,
}
}
func (r *fullResponseCollector) CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
@ -23,9 +29,30 @@ func (r *fullResponseCollector) CollectResponse(ctx context.Context, peerId, obj
if !ok {
return ErrUnexpectedResponseType
}
r.heads = treeResp.Heads
r.root = treeResp.Root
r.changes = append(r.changes, treeResp.Changes...)
if r.objectTree == nil {
createPayload := treestorage.TreeStorageCreatePayload{
RootRawChange: treeResp.Root,
Changes: treeResp.Changes,
Heads: treeResp.Heads,
}
validator := r.deps.ValidateObjectTree
if validator == nil {
validator = objecttree.ValidateRawTreeDefault
}
objTree, err := validator(createPayload, r.deps.SpaceStorage, r.deps.AclList)
if err != nil {
return err
}
r.objectTree = objTree
return nil
}
_, err := r.objectTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
NewHeads: treeResp.Heads,
RawChanges: treeResp.Changes,
})
if err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,70 @@
package synctree
import (
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/response"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
)
func TestFullResponseCollector_CollectResponse(t *testing.T) {
t.Run("no object tree", func(t *testing.T) {
ctrl := gomock.NewController(t)
objTree := mock_objecttree.NewMockObjectTree(ctrl)
defer ctrl.Finish()
testPayload := treestorage.TreeStorageCreatePayload{
RootRawChange: &treechangeproto.RawTreeChangeWithId{Id: "root"},
Changes: []*treechangeproto.RawTreeChangeWithId{{Id: "change"}},
Heads: []string{"head"},
}
resp := &response.Response{
SpaceId: "spaceId",
ObjectId: "objectId",
Heads: testPayload.Heads,
Changes: testPayload.Changes,
Root: testPayload.RootRawChange,
}
var validator objecttree.ValidatorFunc = func(payload treestorage.TreeStorageCreatePayload, storageCreator objecttree.TreeStorageCreator, aclList list.AclList) (ret objecttree.ObjectTree, err error) {
require.Equal(t, testPayload, payload)
return objTree, nil
}
coll := newFullResponseCollector(BuildDeps{
ValidateObjectTree: validator,
})
err := coll.CollectResponse(nil, "peerId", "objectId", resp)
require.NoError(t, err)
require.Equal(t, objTree, coll.objectTree)
})
t.Run("object tree exists", func(t *testing.T) {
ctrl := gomock.NewController(t)
objTree := mock_objecttree.NewMockObjectTree(ctrl)
defer ctrl.Finish()
testPayload := treestorage.TreeStorageCreatePayload{
RootRawChange: &treechangeproto.RawTreeChangeWithId{Id: "root"},
Changes: []*treechangeproto.RawTreeChangeWithId{{Id: "change"}},
Heads: []string{"head"},
}
resp := &response.Response{
SpaceId: "spaceId",
ObjectId: "objectId",
Heads: testPayload.Heads,
Changes: testPayload.Changes,
Root: testPayload.RootRawChange,
}
coll := newFullResponseCollector(BuildDeps{})
coll.objectTree = objTree
objTree.EXPECT().AddRawChanges(nil, objecttree.RawChangesPayload{
NewHeads: testPayload.Heads,
RawChanges: testPayload.Changes,
}).Return(objecttree.AddResult{}, nil)
err := coll.CollectResponse(nil, "peerId", "objectId", resp)
require.NoError(t, err)
})
}

View file

@ -4,9 +4,6 @@ import (
"context"
"errors"
"go.uber.org/zap"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/net/peer"
)
@ -52,7 +49,7 @@ func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err e
}
func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (collector *fullResponseCollector, err error) {
collector = createCollector()
collector = createCollector(t.deps)
req := t.deps.SyncClient.CreateNewTreeRequest(peerId, t.treeId)
err = t.deps.SyncClient.SendTreeRequest(ctx, req, collector)
if err != nil {
@ -90,23 +87,5 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
return
}
payload := treestorage.TreeStorageCreatePayload{
RootRawChange: collector.root,
Changes: collector.changes,
Heads: collector.heads,
}
validatorFunc := t.deps.ValidateObjectTree
if validatorFunc == nil {
validatorFunc = objecttree.ValidateRawTreeBuildFunc
}
// basically building tree with in-memory storage and validating that it was without errors
log.With(zap.String("id", t.treeId)).DebugCtx(ctx, "validating tree")
newPayload, err := validatorFunc(payload, t.deps.BuildObjectTree, t.deps.AclList)
if err != nil {
return
}
// now we are sure that we can save it to the storage
treeStorage, err = t.deps.SpaceStorage.CreateTreeStorage(newPayload)
return
return collector.objectTree.Storage(), peerId, nil
}

View file

@ -55,8 +55,8 @@ func TestTreeRemoteGetter(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
coll := newFullResponseCollector()
createCollector = func() *fullResponseCollector {
coll := newFullResponseCollector(BuildDeps{})
createCollector = func(deps BuildDeps) *fullResponseCollector {
return coll
}
tCtx := peer.CtxWithPeerId(ctx, "*")
@ -74,8 +74,8 @@ func TestTreeRemoteGetter(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
coll := newFullResponseCollector()
createCollector = func() *fullResponseCollector {
coll := newFullResponseCollector(BuildDeps{})
createCollector = func(deps BuildDeps) *fullResponseCollector {
return coll
}
tCtx := peer.CtxWithPeerId(ctx, "*")
@ -91,8 +91,8 @@ func TestTreeRemoteGetter(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
coll := newFullResponseCollector()
createCollector = func() *fullResponseCollector {
coll := newFullResponseCollector(BuildDeps{})
createCollector = func(deps BuildDeps) *fullResponseCollector {
return coll
}
tCtx := peer.CtxWithPeerId(ctx, "*")