From d9aad302f27e98ed1255ddf66e7754b8e88155fd Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sun, 19 May 2024 14:11:07 +0200 Subject: [PATCH] Fix invalid change and add logs --- .../object/tree/objecttree/objecttree.go | 27 ++++--- .../object/tree/objecttree/objecttree_test.go | 79 +++++++++++++++++++ .../tree/objecttree/objecttreevalidator.go | 25 +++--- util/debug/stack.go | 28 +++++++ util/debug/stack_test.go | 38 +++++++++ 5 files changed, 177 insertions(+), 20 deletions(-) create mode 100644 util/debug/stack.go create mode 100644 util/debug/stack_test.go diff --git a/commonspace/object/tree/objecttree/objecttree.go b/commonspace/object/tree/objecttree/objecttree.go index 7eeeac62..544e5c2b 100644 --- a/commonspace/object/tree/objecttree/objecttree.go +++ b/commonspace/object/tree/objecttree/objecttree.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/anyproto/any-sync/util/debug" + "go.uber.org/zap" "github.com/anyproto/any-sync/commonspace/object/acl/list" @@ -17,14 +19,6 @@ import ( "github.com/anyproto/any-sync/util/slice" ) -type RWLocker interface { - sync.Locker - RLock() - RUnlock() - TryRLock() bool - TryLock() bool -} - var ( ErrHasInvalidChanges = errors.New("the change is invalid") ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot") @@ -32,6 +26,7 @@ var ( ErrMissingKey = errors.New("missing current read key") ErrDerived = errors.New("expect >= 2 changes in derived tree") ErrDeleted = errors.New("object tree is deleted") + ErrNoAclHead = errors.New("no acl head") ) type AddResultSummary int @@ -53,7 +48,7 @@ type ChangeIterateFunc = func(change *Change) bool type ChangeConvertFunc = func(change *Change, decrypted []byte) (any, error) type ReadableObjectTree interface { - RWLocker + sync.Locker Id() string Header() *treechangeproto.RawTreeChangeWithId @@ -118,7 +113,7 @@ type objectTree struct { snapshotPath []string - sync.RWMutex + sync.Mutex } func (ot *objectTree) rebuildFromStorage(theirHeads []string, newChanges []*Change) (err error) { @@ -193,11 +188,20 @@ func (ot *objectTree) GetChange(id string) (*Change, error) { return nil, ErrNoChangeInTree } +func (ot *objectTree) logUseWhenUnlocked() { + // this is needed to check when we use the tree not under the lock + if ot.TryLock() { + log.With(zap.String("treeId", ot.id), zap.String("stack", debug.StackCompact(true))).Error("use tree when unlocked") + ot.Unlock() + } +} + func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeContent) (res AddResult, err error) { if ot.isDeleted { err = ErrDeleted return } + ot.logUseWhenUnlocked() payload, err := ot.prepareBuilderContent(content) if err != nil { return @@ -315,6 +319,7 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang err = ErrDeleted return } + ot.logUseWhenUnlocked() lastHeadId := ot.tree.lastIteratedHeadId addResult, err = ot.addRawChanges(ctx, changesPayload) if err != nil { @@ -395,7 +400,7 @@ func (ot *objectTree) addRawChanges(ctx context.Context, changesPayload RawChang headsToUse = changesPayload.NewHeads ) // if our validator provides filtering mechanism then we use it - filteredHeads, ot.newChangesBuf, ot.newSnapshotsBuf, ot.notSeenIdxBuf = ot.validator.FilterChanges(ot.aclList, changesPayload.NewHeads, ot.newChangesBuf, ot.newSnapshotsBuf, ot.notSeenIdxBuf) + filteredHeads, ot.newChangesBuf, ot.newSnapshotsBuf, ot.notSeenIdxBuf = ot.validator.FilterChanges(ot.aclList, ot.newChangesBuf, ot.newSnapshotsBuf, ot.notSeenIdxBuf) if filteredHeads { // if we filtered some of the heads, then we don't know which heads to use headsToUse = []string{} diff --git a/commonspace/object/tree/objecttree/objecttree_test.go b/commonspace/object/tree/objecttree/objecttree_test.go index 024ed1fe..c280c6aa 100644 --- a/commonspace/object/tree/objecttree/objecttree_test.go +++ b/commonspace/object/tree/objecttree/objecttree_test.go @@ -15,6 +15,7 @@ import ( "github.com/anyproto/any-sync/commonspace/object/accountdata" "github.com/anyproto/any-sync/commonspace/object/acl/list" + "github.com/anyproto/any-sync/commonspace/object/acl/liststorage" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" ) @@ -241,6 +242,84 @@ func TestObjectTree(t *testing.T) { require.NoError(t, err) }) + t.Run("filter changes when no aclHeadId", func(t *testing.T) { + exec := list.NewAclExecutor("spaceId") + type cmdErr struct { + cmd string + err error + } + cmds := []cmdErr{ + {"a.init::a", nil}, + {"a.invite::invId", nil}, + {"b.join::invId", nil}, + {"a.approve::b,r", nil}, + } + for _, cmd := range cmds { + err := exec.Execute(cmd.cmd) + require.Equal(t, cmd.err, err, cmd) + } + aAccount := exec.ActualAccounts()["a"] + bAccount := exec.ActualAccounts()["b"] + root, err := CreateObjectTreeRoot(ObjectTreeCreatePayload{ + PrivKey: aAccount.Keys.SignKey, + ChangeType: "changeType", + ChangePayload: nil, + SpaceId: "spaceId", + IsEncrypted: true, + }, aAccount.Acl) + require.NoError(t, err) + aStore, _ := treestorage.NewInMemoryTreeStorage(root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root}) + aTree, err := BuildKeyFilterableObjectTree(aStore, aAccount.Acl) + require.NoError(t, err) + _, err = aTree.AddContent(ctx, SignableChangeContent{ + Data: []byte("some"), + Key: aAccount.Keys.SignKey, + IsSnapshot: false, + IsEncrypted: true, + DataType: mockDataType, + }) + require.NoError(t, err) + bStore := aTree.Storage().(*treestorage.InMemoryTreeStorage).Copy() + // copying old version of storage + prevAclRecs, err := bAccount.Acl.RecordsAfter(ctx, "") + require.NoError(t, err) + storage, err := liststorage.NewInMemoryAclListStorage(prevAclRecs[0].Id, prevAclRecs) + require.NoError(t, err) + acl, err := list.BuildAclListWithIdentity(bAccount.Keys, storage, list.NoOpAcceptorVerifier{}) + require.NoError(t, err) + // creating tree with old storage which doesn't have a new invite record + bTree, err := BuildKeyFilterableObjectTree(bStore, acl) + require.NoError(t, err) + err = exec.Execute("a.invite::inv1Id") + require.NoError(t, err) + res, err := aTree.AddContent(ctx, SignableChangeContent{ + Data: []byte("some"), + Key: aAccount.Keys.SignKey, + IsSnapshot: false, + IsEncrypted: true, + DataType: mockDataType, + }) + unexpectedId := res.Added[0].Id + require.NoError(t, err) + var collectedChanges []*Change + err = aTree.IterateRoot(func(change *Change, decrypted []byte) (any, error) { + return nil, nil + }, func(change *Change) bool { + collectedChanges = append(collectedChanges, change) + return true + }) + require.NoError(t, err) + bObjTree := bTree.(*objectTree) + // this is just a random slice, so the func works + indexes := []int{1, 2, 3, 4, 5} + // checking that we filter the changes + filtered, filteredChanges, _, _ := bObjTree.validator.FilterChanges(bObjTree.aclList, collectedChanges, nil, indexes) + require.True(t, filtered) + for _, ch := range filteredChanges { + require.NotEqual(t, unexpectedId, ch.Id) + } + }) + t.Run("add content", func(t *testing.T) { root, err := CreateObjectTreeRoot(ObjectTreeCreatePayload{ PrivKey: keys.SignKey, diff --git a/commonspace/object/tree/objecttree/objecttreevalidator.go b/commonspace/object/tree/objecttree/objecttreevalidator.go index fc6dc6ee..27a5e215 100644 --- a/commonspace/object/tree/objecttree/objecttreevalidator.go +++ b/commonspace/object/tree/objecttree/objecttreevalidator.go @@ -16,7 +16,7 @@ type ObjectTreeValidator interface { ValidateFullTree(tree *Tree, aclList list.AclList) error // ValidateNewChanges should always be entered while holding a read lock on AclList ValidateNewChanges(tree *Tree, aclList list.AclList, newChanges []*Change) error - FilterChanges(aclList list.AclList, heads []string, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) + FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) } type noOpTreeValidator struct { @@ -31,7 +31,7 @@ func (n *noOpTreeValidator) ValidateNewChanges(tree *Tree, aclList list.AclList, return nil } -func (n *noOpTreeValidator) FilterChanges(aclList list.AclList, heads []string, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) { +func (n *noOpTreeValidator) FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) { if n.filterFunc == nil { return false, changes, snapshots, indexes } @@ -80,7 +80,7 @@ func (v *objectTreeValidator) ValidateNewChanges(tree *Tree, aclList list.AclLis return } -func (v *objectTreeValidator) FilterChanges(aclList list.AclList, heads []string, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) { +func (v *objectTreeValidator) FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) { if !v.shouldFilter { return false, changes, snapshots, indexes } @@ -88,18 +88,25 @@ func (v *objectTreeValidator) FilterChanges(aclList list.AclList, heads []string defer aclList.RUnlock() state := aclList.AclState() for idx, c := range changes { - // only taking changes which we can read - if keys, exists := state.Keys()[c.ReadKeyId]; exists && keys.ReadKey != nil { + // this has to be a root + if c.PreviousIds == nil { + newIndexes = append(newIndexes, indexes[idx]) + filtered = append(filtered, c) + filteredSnapshots = append(filteredSnapshots, c) + continue + } + // only taking changes which we can read and for which we have acl heads + if keys, exists := state.Keys()[c.ReadKeyId]; aclList.HasHead(c.AclHeadId) && exists && keys.ReadKey != nil { newIndexes = append(newIndexes, indexes[idx]) filtered = append(filtered, c) if c.IsSnapshot { filteredSnapshots = append(filteredSnapshots, c) } - } else { - // if we filtered at least one change this can be the change between heads and other changes - // thus we cannot use heads - filteredHeads = true + continue } + // if we filtered at least one change this can be the change between heads and other changes + // thus we cannot use heads + filteredHeads = true } return } diff --git a/util/debug/stack.go b/util/debug/stack.go new file mode 100644 index 00000000..a7940649 --- /dev/null +++ b/util/debug/stack.go @@ -0,0 +1,28 @@ +package debug + +import ( + "bytes" + "compress/gzip" + "encoding/base64" + "runtime" +) + +func StackCompact(allGoroutines bool) string { + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + _, _ = gz.Write(Stack(allGoroutines)) + _ = gz.Close() + + return base64.StdEncoding.EncodeToString(buf.Bytes()) +} + +func Stack(allGoroutines bool) []byte { + buf := make([]byte, 1024) + for { + n := runtime.Stack(buf, allGoroutines) + if n < len(buf) { + return buf[:n] + } + buf = make([]byte, 2*len(buf)) + } +} diff --git a/util/debug/stack_test.go b/util/debug/stack_test.go new file mode 100644 index 00000000..ccb4634f --- /dev/null +++ b/util/debug/stack_test.go @@ -0,0 +1,38 @@ +package debug + +import ( + "bytes" + "compress/gzip" + "encoding/base64" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStack(t *testing.T) { + stack := Stack(true) + require.True(t, strings.Contains(string(stack), "main.main")) +} + +func TestStackCompact(t *testing.T) { + stack := StackCompact(true) + decoded, err := base64.StdEncoding.DecodeString(string(stack)) + require.NoError(t, err) + rd, err := gzip.NewReader(bytes.NewReader(decoded)) + require.NoError(t, err) + var ( + buf = make([]byte, 1024) + res []byte + ) + for { + n, err := rd.Read(buf) + if n > 0 { + res = append(res, buf[:n]...) + } + if err != nil { + break + } + } + require.True(t, strings.Contains(string(res), "main.main")) +}