diff --git a/commonspace/object/tree/objecttree/change.go b/commonspace/object/tree/objecttree/change.go index c1f0f91b..6e2307e5 100644 --- a/commonspace/object/tree/objecttree/change.go +++ b/commonspace/object/tree/objecttree/change.go @@ -36,6 +36,9 @@ type Change struct { OrderId string SnapshotCounter int + // using this on build stage + rawChange *treechangeproto.RawTreeChangeWithId + // iterator helpers visited bool branchesFinished bool diff --git a/commonspace/object/tree/objecttree/objecttree.go b/commonspace/object/tree/objecttree/objecttree.go index 909cbd0a..e25b89f8 100644 --- a/commonspace/object/tree/objecttree/objecttree.go +++ b/commonspace/object/tree/objecttree/objecttree.go @@ -130,14 +130,13 @@ type objectTree struct { difSnapshotBuf []*treechangeproto.RawTreeChangeWithId newChangesBuf []*Change newSnapshotsBuf []*Change - notSeenIdxBuf []int snapshotPath []string sync.Mutex } -func (ot *objectTree) rebuildFromStorage(theirHeads, theirSnapshotPath []string, newChanges []*Change) (err error) { +func (ot *objectTree) rebuildFromStorage(theirHeads, theirSnapshotPath []string, newChanges []*Change) (added []*Change, err error) { var ( ourPath []string oldTree = ot.tree @@ -146,10 +145,10 @@ func (ot *objectTree) rebuildFromStorage(theirHeads, theirSnapshotPath []string, // TODO: add error handling ourPath, err = ot.SnapshotPath() if err != nil { - return fmt.Errorf("rebuild from storage: %w", err) + return nil, fmt.Errorf("rebuild from storage: %w", err) } } - ot.tree, err = ot.treeBuilder.Build(treeBuilderOpts{ + ot.tree, added, err = ot.treeBuilder.buildWithAdded(treeBuilderOpts{ theirHeads: theirHeads, ourSnapshotPath: ourPath, theirSnapshotPath: theirSnapshotPath, @@ -178,7 +177,7 @@ func (ot *objectTree) rebuildFromStorage(theirHeads, theirSnapshotPath []string, // it is a good question whether we need to validate everything // because maybe we can trust the stuff that is already in the storage - return ot.validateTree(nil) + return added, ot.validateTree(nil) } func (ot *objectTree) Id() string { @@ -406,7 +405,7 @@ func (ot *objectTree) AddRawChangesWithUpdater(ctx context.Context, changes RawC } rollback := func() { - rebuildErr := ot.rebuildFromStorage(nil, nil, nil) + _, rebuildErr := ot.rebuildFromStorage(nil, nil, nil) if rebuildErr != nil { log.Error("failed to rebuild after adding to storage", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr)) } @@ -436,7 +435,6 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawChangesPayload) (addResult AddResult, err error) { // resetting buffers ot.newChangesBuf = ot.newChangesBuf[:0] - ot.notSeenIdxBuf = ot.notSeenIdxBuf[:0] ot.difSnapshotBuf = ot.difSnapshotBuf[:0] ot.newSnapshotsBuf = ot.newSnapshotsBuf[:0] @@ -453,7 +451,7 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh ) // filtering changes, verifying and unmarshalling them - for idx, ch := range changesPayload.RawChanges { + for _, ch := range changesPayload.RawChanges { // not unmarshalling the changes if they were already added either as unattached or attached if _, exists := ot.tree.attached[ch.Id]; exists { continue @@ -468,16 +466,16 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh return } } + change.rawChange = ch if change.IsSnapshot { ot.newSnapshotsBuf = append(ot.newSnapshotsBuf, change) } ot.newChangesBuf = append(ot.newChangesBuf, change) - ot.notSeenIdxBuf = append(ot.notSeenIdxBuf, idx) } // if no new changes, then returning - if len(ot.notSeenIdxBuf) == 0 { + if len(ot.newChangesBuf) == 0 { addResult = AddResult{ OldHeads: prevHeadsCopy, Heads: prevHeadsCopy, @@ -491,7 +489,7 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh headsToUse = changesPayload.NewHeads ) // if our validator provides filtering mechanism then we use it - filteredHeads, ot.newChangesBuf, ot.newSnapshotsBuf, ot.notSeenIdxBuf = ot.validator.FilterChanges(ot.aclList, ot.newChangesBuf, ot.newSnapshotsBuf, ot.notSeenIdxBuf) + filteredHeads, ot.newChangesBuf, ot.newSnapshotsBuf = ot.validator.FilterChanges(ot.aclList, ot.newChangesBuf, ot.newSnapshotsBuf) if filteredHeads { // if we filtered some of the heads, then we don't know which heads to use headsToUse = []string{} @@ -553,22 +551,23 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh } log := log.With(zap.String("treeId", ot.id)) if shouldRebuildFromStorage { - err = ot.rebuildFromStorage(headsToUse, changesPayload.SnapshotPath, ot.newChangesBuf) + var added []*Change + added, err = ot.rebuildFromStorage(headsToUse, changesPayload.SnapshotPath, ot.newChangesBuf) if err != nil { log.Error("failed to rebuild with new heads", zap.Strings("headsToUse", headsToUse), zap.Error(err)) // rebuilding without new changes - rebuildErr := ot.rebuildFromStorage(nil, nil, nil) + _, rebuildErr := ot.rebuildFromStorage(nil, nil, nil) if rebuildErr != nil { log.Error("failed to rebuild from storage", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr)) } return } - addResult, err = ot.createAddResult(prevHeadsCopy, Rebuild, changesPayload.RawChanges) + addResult, err = ot.createAddResult(prevHeadsCopy, Rebuild, added) if err != nil { log.Error("failed to create add result", zap.Strings("headsToUse", headsToUse), zap.Error(err)) // that means that some unattached changes were somehow corrupted in memory // this shouldn't happen but if that happens, then rebuilding from storage - rebuildErr := ot.rebuildFromStorage(nil, nil, nil) + _, rebuildErr := ot.rebuildFromStorage(nil, nil, nil) if rebuildErr != nil { log.Error("failed to rebuild after add result", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr)) } @@ -595,12 +594,12 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh err = fmt.Errorf("%w: %w", ErrHasInvalidChanges, err) return } - addResult, err = ot.createAddResult(prevHeadsCopy, mode, changesPayload.RawChanges) + addResult, err = ot.createAddResult(prevHeadsCopy, mode, treeChangesAdded) if err != nil { // that means that some unattached changes were somehow corrupted in memory // this shouldn't happen but if that happens, then rebuilding from storage rollback(treeChangesAdded) - rebuildErr := ot.rebuildFromStorage(nil, nil, nil) + _, rebuildErr := ot.rebuildFromStorage(nil, nil, nil) if rebuildErr != nil { log.Error("failed to rebuild after add result (add to tree)", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr)) } @@ -610,43 +609,27 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh } } -func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, rawChanges []*treechangeproto.RawTreeChangeWithId) (addResult AddResult, err error) { - headsCopy := func() []string { - newHeads := make([]string, 0, len(ot.tree.Heads())) - newHeads = append(newHeads, ot.tree.Heads()...) - return newHeads - } - - // returns changes that we added to the tree as attached this round - // they can include not only the changes that were added now, - // but also the changes that were previously in the tree - getAddedChanges := func() (added []StorageChange, err error) { - for _, idx := range ot.notSeenIdxBuf { - rawChange := rawChanges[idx] - if ch, exists := ot.tree.attached[rawChange.Id]; exists { - ot.flusher.MarkNewChange(ch) - added = append(added, StorageChange{ - RawChange: rawChange.RawChange, - PrevIds: ch.PreviousIds, - Id: ch.Id, - SnapshotCounter: ch.SnapshotCounter, - SnapshotId: ch.SnapshotId, - OrderId: ch.OrderId, - ChangeSize: len(rawChange.RawChange), - }) - } - } - return - } - +func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, changes []*Change) (addResult AddResult, err error) { + newHeads := make([]string, 0, len(ot.tree.Heads())) + newHeads = append(newHeads, ot.tree.Heads()...) var added []StorageChange - added, err = getAddedChanges() - if err != nil { - return + for _, ch := range changes { + rawChange := ch.rawChange + ot.flusher.MarkNewChange(ch) + added = append(added, StorageChange{ + RawChange: rawChange.RawChange, + PrevIds: ch.PreviousIds, + Id: ch.Id, + SnapshotCounter: ch.SnapshotCounter, + SnapshotId: ch.SnapshotId, + OrderId: ch.OrderId, + ChangeSize: len(rawChange.RawChange), + }) + ch.rawChange = nil } addResult = AddResult{ OldHeads: oldHeads, - Heads: headsCopy(), + Heads: newHeads, Added: added, Mode: mode, } diff --git a/commonspace/object/tree/objecttree/objecttree_test.go b/commonspace/object/tree/objecttree/objecttree_test.go index d379086b..bee8dc0d 100644 --- a/commonspace/object/tree/objecttree/objecttree_test.go +++ b/commonspace/object/tree/objecttree/objecttree_test.go @@ -488,10 +488,8 @@ func TestObjectTree(t *testing.T) { }) require.NoError(t, err) bObjTree := bTree.(*objectTree) - // this is just a random slice, so the func works - indexes := []int{1, 2, 3, 4, 5} // checking that we filter the changes - filtered, filteredChanges, _, _ := bObjTree.validator.FilterChanges(bObjTree.aclList, collectedChanges, nil, indexes) + filtered, filteredChanges, _ := bObjTree.validator.FilterChanges(bObjTree.aclList, collectedChanges, nil) require.True(t, filtered) for _, ch := range filteredChanges { require.NotEqual(t, unexpectedId, ch.Id) @@ -930,6 +928,86 @@ func TestObjectTree(t *testing.T) { } }) + t.Run("test fix incorrect changes added", func(t *testing.T) { + treeCtx := prepareTreeContext(t, aclList) + treeStorage := treeCtx.treeStorage + changeCreator := treeCtx.changeCreator + objTree := treeCtx.objTree + + rawChangesFirst := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRoot("0", aclList.Head().Id), + changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"), + } + rawChangesSecond := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRoot("0", aclList.Head().Id), + changeCreator.CreateRaw("2", aclList.Head().Id, "0", true, "0"), + } + payloadFirst := RawChangesPayload{ + NewHeads: []string{"1"}, + RawChanges: rawChangesFirst, + } + payloadSecond := RawChangesPayload{ + NewHeads: []string{"2"}, + RawChanges: rawChangesSecond, + } + + res, err := objTree.AddRawChanges(context.Background(), payloadFirst) + require.NoError(t, err, "adding changes should be without error") + require.Equal(t, []string{"1"}, res.Heads) + + res, err = objTree.AddRawChanges(context.Background(), payloadSecond) + require.NoError(t, err, "adding changes should be without error") + require.Equal(t, []string{"1", "2"}, res.Heads) + + for _, ch := range append(rawChangesFirst, rawChangesSecond...) { + raw, err := treeStorage.Get(context.Background(), ch.Id) + assert.NoError(t, err, "storage should have all the changes") + assert.Equal(t, ch.Id, raw.RawTreeChangeWithId().Id, "the changes in the storage should be the same") + assert.Equal(t, ch.RawChange, raw.RawTreeChangeWithId().RawChange, "the changes in the storage should be the same") + } + }) + + t.Run("test fix incorrect changes added with snapshot path", func(t *testing.T) { + treeCtx := prepareTreeContext(t, aclList) + treeStorage := treeCtx.treeStorage + changeCreator := treeCtx.changeCreator + objTree := treeCtx.objTree + + rawChangesFirst := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRoot("0", aclList.Head().Id), + changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"), + } + rawChangesSecond := []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRoot("0", aclList.Head().Id), + changeCreator.CreateRaw("2", aclList.Head().Id, "0", true, "0"), + } + payloadFirst := RawChangesPayload{ + NewHeads: []string{"1"}, + RawChanges: rawChangesFirst, + SnapshotPath: []string{"0", "1"}, + } + payloadSecond := RawChangesPayload{ + NewHeads: []string{"2"}, + RawChanges: rawChangesSecond, + SnapshotPath: []string{"0", "2"}, + } + + res, err := objTree.AddRawChanges(context.Background(), payloadFirst) + require.NoError(t, err, "adding changes should be without error") + require.Equal(t, []string{"1"}, res.Heads) + + res, err = objTree.AddRawChanges(context.Background(), payloadSecond) + require.NoError(t, err, "adding changes should be without error") + require.Equal(t, []string{"1", "2"}, res.Heads) + + for _, ch := range append(rawChangesFirst, rawChangesSecond...) { + raw, err := treeStorage.Get(context.Background(), ch.Id) + assert.NoError(t, err, "storage should have all the changes") + assert.Equal(t, ch.Id, raw.RawTreeChangeWithId().Id, "the changes in the storage should be the same") + assert.Equal(t, ch.RawChange, raw.RawTreeChangeWithId().RawChange, "the changes in the storage should be the same") + } + }) + t.Run("add with rollback", func(t *testing.T) { ctx := prepareTreeContext(t, aclList) changeCreator := ctx.changeCreator diff --git a/commonspace/object/tree/objecttree/objecttreefactory.go b/commonspace/object/tree/objecttree/objecttreefactory.go index 6b84d602..ecfb313f 100644 --- a/commonspace/object/tree/objecttree/objecttreefactory.go +++ b/commonspace/object/tree/objecttree/objecttreefactory.go @@ -250,12 +250,11 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { keys: make(map[string]crypto.SymKey), newChangesBuf: make([]*Change, 0, 10), difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10), - notSeenIdxBuf: make([]int, 0, 10), newSnapshotsBuf: make([]*Change, 0, 10), flusher: deps.flusher, } - err := objTree.rebuildFromStorage(nil, nil, nil) + _, err := objTree.rebuildFromStorage(nil, nil, nil) if err != nil { return nil, fmt.Errorf("failed to rebuild from storage: %w", err) } @@ -288,7 +287,6 @@ func buildHistoryTree(deps objectTreeDeps, params HistoryTreeParams) (ht History keys: make(map[string]crypto.SymKey), newChangesBuf: make([]*Change, 0, 10), difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10), - notSeenIdxBuf: make([]int, 0, 10), newSnapshotsBuf: make([]*Change, 0, 10), flusher: deps.flusher, } diff --git a/commonspace/object/tree/objecttree/objecttreevalidator.go b/commonspace/object/tree/objecttree/objecttreevalidator.go index 805d45eb..75e00d6f 100644 --- a/commonspace/object/tree/objecttree/objecttreevalidator.go +++ b/commonspace/object/tree/objecttree/objecttreevalidator.go @@ -35,7 +35,7 @@ type ObjectTreeValidator interface { ValidateFullTree(tree *Tree, aclList list.AclList) error // ValidateNewChanges should always be entered while holding a read lock on AclList ValidateNewChanges(tree *Tree, aclList list.AclList, newChanges []*Change) error - FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) + FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change) (filteredHeads bool, filtered, filteredSnapshots []*Change) } type noOpTreeValidator struct { @@ -57,14 +57,13 @@ func (n *noOpTreeValidator) ValidateNewChanges(tree *Tree, aclList list.AclList, return nil } -func (n *noOpTreeValidator) FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) { +func (n *noOpTreeValidator) FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change) (filteredHeads bool, filtered, filteredSnapshots []*Change) { if n.filterFunc == nil { - return false, changes, snapshots, indexes + return false, changes, snapshots } - for idx, c := range changes { + for _, c := range changes { // only taking changes which we can read if n.filterFunc(c) { - newIndexes = append(newIndexes, indexes[idx]) filtered = append(filtered, c) if c.IsSnapshot { filteredSnapshots = append(filteredSnapshots, c) @@ -106,24 +105,22 @@ func (v *objectTreeValidator) ValidateNewChanges(tree *Tree, aclList list.AclLis return } -func (v *objectTreeValidator) FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) { +func (v *objectTreeValidator) FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change) (filteredHeads bool, filtered, filteredSnapshots []*Change) { if !v.shouldFilter { - return false, changes, snapshots, indexes + return false, changes, snapshots } aclList.RLock() defer aclList.RUnlock() state := aclList.AclState() - for idx, c := range changes { + for _, c := range changes { // this has to be a root if c.PreviousIds == nil { - newIndexes = append(newIndexes, indexes[idx]) filtered = append(filtered, c) filteredSnapshots = append(filteredSnapshots, c) continue } // only taking changes which we can read and for which we have acl heads if keys, exists := state.Keys()[c.ReadKeyId]; aclList.HasHead(c.AclHeadId) && exists && keys.ReadKey != nil { - newIndexes = append(newIndexes, indexes[idx]) filtered = append(filtered, c) if c.IsSnapshot { filteredSnapshots = append(filteredSnapshots, c) diff --git a/commonspace/object/tree/objecttree/treebuilder.go b/commonspace/object/tree/objecttree/treebuilder.go index b9e87c34..64e58e71 100644 --- a/commonspace/object/tree/objecttree/treebuilder.go +++ b/commonspace/object/tree/objecttree/treebuilder.go @@ -47,10 +47,6 @@ func newTreeBuilder(storage Storage, builder ChangeBuilder) *treeBuilder { } } -func (tb *treeBuilder) Build(opts treeBuilderOpts) (*Tree, error) { - return tb.build(opts) -} - func (tb *treeBuilder) BuildFull() (*Tree, error) { return tb.build(treeBuilderOpts{full: true}) } @@ -61,7 +57,7 @@ var ( totalLowest atomic.Int32 ) -func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) { +func (tb *treeBuilder) buildWithAdded(opts treeBuilderOpts) (*Tree, []*Change, error) { cache := make(map[string]*Change) tb.ctx = context.Background() for _, ch := range opts.newChanges { @@ -74,12 +70,12 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) { if opts.useHeadsSnapshot { maxOrder, lowest, err := tb.lowestSnapshots(nil, opts.ourHeads, "") if err != nil { - return nil, err + return nil, nil, err } if len(lowest) != 1 { snapshot, err = tb.commonSnapshot(lowest) if err != nil { - return nil, err + return nil, nil, err } } else { snapshot = lowest[0] @@ -94,28 +90,29 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) { if len(opts.ourSnapshotPath) == 0 { common, err := tb.storage.CommonSnapshot(tb.ctx) if err != nil { - return nil, err + return nil, nil, err } snapshot = common } else { our := opts.ourSnapshotPath[0] _, lowest, err := tb.lowestSnapshots(cache, opts.theirHeads, our) if err != nil { - return nil, err + return nil, nil, err } if len(lowest) != 1 { snapshot, err = tb.commonSnapshot(lowest) if err != nil { - return nil, err + return nil, nil, err } } else { snapshot = lowest[0] } } } else { + var err error snapshot, err = commonSnapshotForTwoPaths(opts.ourSnapshotPath, opts.theirSnapshotPath) if err != nil { - return nil, err + return nil, nil, err } } } else { @@ -124,10 +121,13 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) { totalSnapshots.Store(totalSnapshots.Load() + 1) snapshotCh, err := tb.storage.Get(tb.ctx, snapshot) if err != nil { - return nil, fmt.Errorf("failed to get common snapshot %s: %w", snapshot, err) + return nil, nil, fmt.Errorf("failed to get common snapshot %s: %w", snapshot, err) } rawChange := &treechangeproto.RawTreeChangeWithId{} - var changes []*Change + var ( + changes = make([]*Change, 0, 10) + newChanges = make([]*Change, 0, 10) + ) err = tb.storage.GetAfterOrder(tb.ctx, snapshotCh.OrderId, func(ctx context.Context, storageChange StorageChange) (shouldContinue bool, err error) { if order != "" && storageChange.OrderId > order { return false, nil @@ -141,21 +141,44 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) { ch.OrderId = storageChange.OrderId ch.SnapshotCounter = storageChange.SnapshotCounter changes = append(changes, ch) + if _, contains := cache[ch.Id]; contains { + delete(cache, ch.Id) + } return true, nil }) if err != nil { - return nil, fmt.Errorf("failed to get changes after order: %w", err) + return nil, nil, fmt.Errorf("failed to get changes after order: %w", err) + } + // getting the filtered new changes, we know that we don't have them in storage + for _, change := range cache { + newChanges = append(newChanges, change) } if len(changes) == 0 { - return nil, ErrEmpty + return nil, nil, ErrEmpty } - tr = &Tree{} - changes = append(changes, opts.newChanges...) - tr.AddFast(changes...) + tr := &Tree{} + changes = append(changes, newChanges...) + added := tr.AddFast(changes...) if opts.useHeadsSnapshot { + // this is important for history, because by default we get everything after the snapshot tr.LeaveOnlyBefore(opts.ourHeads) } - return tr, nil + if len(newChanges) > 0 { + newChanges = newChanges[:0] + for _, change := range added { + // only those that are both added and are new we deem newChanges + if _, contains := cache[change.Id]; contains { + newChanges = append(newChanges, change) + } + } + return tr, newChanges, nil + } + return tr, nil, nil +} + +func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) { + tr, _, err = tb.buildWithAdded(opts) + return } func (tb *treeBuilder) lowestSnapshots(cache map[string]*Change, heads []string, ourSnapshot string) (maxOrder string, snapshots []string, err error) { @@ -202,7 +225,7 @@ func (tb *treeBuilder) lowestSnapshots(cache map[string]*Change, heads []string, current = append(current, next...) next = next[:0] for _, id := range current { - if ch, ok := cache[id]; ok { + if ch, ok := cache[id]; ok && ch.SnapshotId != "" { if ch.visited { continue }