1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-07 21:47:02 +09:00

Merge pull request #461 from anyproto/GO-5739-fix-old-changes-apply

GO-5739: Fix applying changes when there are duplicates
This commit is contained in:
Sergey Cherepanov 2025-06-02 12:27:38 +02:00 committed by GitHub
commit e047a5b2b8
Signed by: github
GPG key ID: B5690EEEBB952194
6 changed files with 168 additions and 86 deletions

View file

@ -36,6 +36,9 @@ type Change struct {
OrderId string
SnapshotCounter int
// using this on build stage
rawChange *treechangeproto.RawTreeChangeWithId
// iterator helpers
visited bool
branchesFinished bool

View file

@ -130,14 +130,13 @@ type objectTree struct {
difSnapshotBuf []*treechangeproto.RawTreeChangeWithId
newChangesBuf []*Change
newSnapshotsBuf []*Change
notSeenIdxBuf []int
snapshotPath []string
sync.Mutex
}
func (ot *objectTree) rebuildFromStorage(theirHeads, theirSnapshotPath []string, newChanges []*Change) (err error) {
func (ot *objectTree) rebuildFromStorage(theirHeads, theirSnapshotPath []string, newChanges []*Change) (added []*Change, err error) {
var (
ourPath []string
oldTree = ot.tree
@ -146,10 +145,10 @@ func (ot *objectTree) rebuildFromStorage(theirHeads, theirSnapshotPath []string,
// TODO: add error handling
ourPath, err = ot.SnapshotPath()
if err != nil {
return fmt.Errorf("rebuild from storage: %w", err)
return nil, fmt.Errorf("rebuild from storage: %w", err)
}
}
ot.tree, err = ot.treeBuilder.Build(treeBuilderOpts{
ot.tree, added, err = ot.treeBuilder.buildWithAdded(treeBuilderOpts{
theirHeads: theirHeads,
ourSnapshotPath: ourPath,
theirSnapshotPath: theirSnapshotPath,
@ -178,7 +177,7 @@ func (ot *objectTree) rebuildFromStorage(theirHeads, theirSnapshotPath []string,
// it is a good question whether we need to validate everything
// because maybe we can trust the stuff that is already in the storage
return ot.validateTree(nil)
return added, ot.validateTree(nil)
}
func (ot *objectTree) Id() string {
@ -406,7 +405,7 @@ func (ot *objectTree) AddRawChangesWithUpdater(ctx context.Context, changes RawC
}
rollback := func() {
rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
_, rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
if rebuildErr != nil {
log.Error("failed to rebuild after adding to storage", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr))
}
@ -436,7 +435,6 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang
func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawChangesPayload) (addResult AddResult, err error) {
// resetting buffers
ot.newChangesBuf = ot.newChangesBuf[:0]
ot.notSeenIdxBuf = ot.notSeenIdxBuf[:0]
ot.difSnapshotBuf = ot.difSnapshotBuf[:0]
ot.newSnapshotsBuf = ot.newSnapshotsBuf[:0]
@ -453,7 +451,7 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh
)
// filtering changes, verifying and unmarshalling them
for idx, ch := range changesPayload.RawChanges {
for _, ch := range changesPayload.RawChanges {
// not unmarshalling the changes if they were already added either as unattached or attached
if _, exists := ot.tree.attached[ch.Id]; exists {
continue
@ -468,16 +466,16 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh
return
}
}
change.rawChange = ch
if change.IsSnapshot {
ot.newSnapshotsBuf = append(ot.newSnapshotsBuf, change)
}
ot.newChangesBuf = append(ot.newChangesBuf, change)
ot.notSeenIdxBuf = append(ot.notSeenIdxBuf, idx)
}
// if no new changes, then returning
if len(ot.notSeenIdxBuf) == 0 {
if len(ot.newChangesBuf) == 0 {
addResult = AddResult{
OldHeads: prevHeadsCopy,
Heads: prevHeadsCopy,
@ -491,7 +489,7 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh
headsToUse = changesPayload.NewHeads
)
// if our validator provides filtering mechanism then we use it
filteredHeads, ot.newChangesBuf, ot.newSnapshotsBuf, ot.notSeenIdxBuf = ot.validator.FilterChanges(ot.aclList, ot.newChangesBuf, ot.newSnapshotsBuf, ot.notSeenIdxBuf)
filteredHeads, ot.newChangesBuf, ot.newSnapshotsBuf = ot.validator.FilterChanges(ot.aclList, ot.newChangesBuf, ot.newSnapshotsBuf)
if filteredHeads {
// if we filtered some of the heads, then we don't know which heads to use
headsToUse = []string{}
@ -553,22 +551,23 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh
}
log := log.With(zap.String("treeId", ot.id))
if shouldRebuildFromStorage {
err = ot.rebuildFromStorage(headsToUse, changesPayload.SnapshotPath, ot.newChangesBuf)
var added []*Change
added, err = ot.rebuildFromStorage(headsToUse, changesPayload.SnapshotPath, ot.newChangesBuf)
if err != nil {
log.Error("failed to rebuild with new heads", zap.Strings("headsToUse", headsToUse), zap.Error(err))
// rebuilding without new changes
rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
_, rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
if rebuildErr != nil {
log.Error("failed to rebuild from storage", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr))
}
return
}
addResult, err = ot.createAddResult(prevHeadsCopy, Rebuild, changesPayload.RawChanges)
addResult, err = ot.createAddResult(prevHeadsCopy, Rebuild, added)
if err != nil {
log.Error("failed to create add result", zap.Strings("headsToUse", headsToUse), zap.Error(err))
// that means that some unattached changes were somehow corrupted in memory
// this shouldn't happen but if that happens, then rebuilding from storage
rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
_, rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
if rebuildErr != nil {
log.Error("failed to rebuild after add result", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr))
}
@ -595,12 +594,12 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh
err = fmt.Errorf("%w: %w", ErrHasInvalidChanges, err)
return
}
addResult, err = ot.createAddResult(prevHeadsCopy, mode, changesPayload.RawChanges)
addResult, err = ot.createAddResult(prevHeadsCopy, mode, treeChangesAdded)
if err != nil {
// that means that some unattached changes were somehow corrupted in memory
// this shouldn't happen but if that happens, then rebuilding from storage
rollback(treeChangesAdded)
rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
_, rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
if rebuildErr != nil {
log.Error("failed to rebuild after add result (add to tree)", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr))
}
@ -610,43 +609,27 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh
}
}
func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, rawChanges []*treechangeproto.RawTreeChangeWithId) (addResult AddResult, err error) {
headsCopy := func() []string {
newHeads := make([]string, 0, len(ot.tree.Heads()))
newHeads = append(newHeads, ot.tree.Heads()...)
return newHeads
}
// returns changes that we added to the tree as attached this round
// they can include not only the changes that were added now,
// but also the changes that were previously in the tree
getAddedChanges := func() (added []StorageChange, err error) {
for _, idx := range ot.notSeenIdxBuf {
rawChange := rawChanges[idx]
if ch, exists := ot.tree.attached[rawChange.Id]; exists {
ot.flusher.MarkNewChange(ch)
added = append(added, StorageChange{
RawChange: rawChange.RawChange,
PrevIds: ch.PreviousIds,
Id: ch.Id,
SnapshotCounter: ch.SnapshotCounter,
SnapshotId: ch.SnapshotId,
OrderId: ch.OrderId,
ChangeSize: len(rawChange.RawChange),
})
}
}
return
}
func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, changes []*Change) (addResult AddResult, err error) {
newHeads := make([]string, 0, len(ot.tree.Heads()))
newHeads = append(newHeads, ot.tree.Heads()...)
var added []StorageChange
added, err = getAddedChanges()
if err != nil {
return
for _, ch := range changes {
rawChange := ch.rawChange
ot.flusher.MarkNewChange(ch)
added = append(added, StorageChange{
RawChange: rawChange.RawChange,
PrevIds: ch.PreviousIds,
Id: ch.Id,
SnapshotCounter: ch.SnapshotCounter,
SnapshotId: ch.SnapshotId,
OrderId: ch.OrderId,
ChangeSize: len(rawChange.RawChange),
})
ch.rawChange = nil
}
addResult = AddResult{
OldHeads: oldHeads,
Heads: headsCopy(),
Heads: newHeads,
Added: added,
Mode: mode,
}

View file

@ -488,10 +488,8 @@ func TestObjectTree(t *testing.T) {
})
require.NoError(t, err)
bObjTree := bTree.(*objectTree)
// this is just a random slice, so the func works
indexes := []int{1, 2, 3, 4, 5}
// checking that we filter the changes
filtered, filteredChanges, _, _ := bObjTree.validator.FilterChanges(bObjTree.aclList, collectedChanges, nil, indexes)
filtered, filteredChanges, _ := bObjTree.validator.FilterChanges(bObjTree.aclList, collectedChanges, nil)
require.True(t, filtered)
for _, ch := range filteredChanges {
require.NotEqual(t, unexpectedId, ch.Id)
@ -930,6 +928,86 @@ func TestObjectTree(t *testing.T) {
}
})
t.Run("test fix incorrect changes added", func(t *testing.T) {
treeCtx := prepareTreeContext(t, aclList)
treeStorage := treeCtx.treeStorage
changeCreator := treeCtx.changeCreator
objTree := treeCtx.objTree
rawChangesFirst := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRoot("0", aclList.Head().Id),
changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"),
}
rawChangesSecond := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRoot("0", aclList.Head().Id),
changeCreator.CreateRaw("2", aclList.Head().Id, "0", true, "0"),
}
payloadFirst := RawChangesPayload{
NewHeads: []string{"1"},
RawChanges: rawChangesFirst,
}
payloadSecond := RawChangesPayload{
NewHeads: []string{"2"},
RawChanges: rawChangesSecond,
}
res, err := objTree.AddRawChanges(context.Background(), payloadFirst)
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, []string{"1"}, res.Heads)
res, err = objTree.AddRawChanges(context.Background(), payloadSecond)
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, []string{"1", "2"}, res.Heads)
for _, ch := range append(rawChangesFirst, rawChangesSecond...) {
raw, err := treeStorage.Get(context.Background(), ch.Id)
assert.NoError(t, err, "storage should have all the changes")
assert.Equal(t, ch.Id, raw.RawTreeChangeWithId().Id, "the changes in the storage should be the same")
assert.Equal(t, ch.RawChange, raw.RawTreeChangeWithId().RawChange, "the changes in the storage should be the same")
}
})
t.Run("test fix incorrect changes added with snapshot path", func(t *testing.T) {
treeCtx := prepareTreeContext(t, aclList)
treeStorage := treeCtx.treeStorage
changeCreator := treeCtx.changeCreator
objTree := treeCtx.objTree
rawChangesFirst := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRoot("0", aclList.Head().Id),
changeCreator.CreateRaw("1", aclList.Head().Id, "0", true, "0"),
}
rawChangesSecond := []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRoot("0", aclList.Head().Id),
changeCreator.CreateRaw("2", aclList.Head().Id, "0", true, "0"),
}
payloadFirst := RawChangesPayload{
NewHeads: []string{"1"},
RawChanges: rawChangesFirst,
SnapshotPath: []string{"0", "1"},
}
payloadSecond := RawChangesPayload{
NewHeads: []string{"2"},
RawChanges: rawChangesSecond,
SnapshotPath: []string{"0", "2"},
}
res, err := objTree.AddRawChanges(context.Background(), payloadFirst)
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, []string{"1"}, res.Heads)
res, err = objTree.AddRawChanges(context.Background(), payloadSecond)
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, []string{"1", "2"}, res.Heads)
for _, ch := range append(rawChangesFirst, rawChangesSecond...) {
raw, err := treeStorage.Get(context.Background(), ch.Id)
assert.NoError(t, err, "storage should have all the changes")
assert.Equal(t, ch.Id, raw.RawTreeChangeWithId().Id, "the changes in the storage should be the same")
assert.Equal(t, ch.RawChange, raw.RawTreeChangeWithId().RawChange, "the changes in the storage should be the same")
}
})
t.Run("add with rollback", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList)
changeCreator := ctx.changeCreator

View file

@ -250,12 +250,11 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
keys: make(map[string]crypto.SymKey),
newChangesBuf: make([]*Change, 0, 10),
difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10),
notSeenIdxBuf: make([]int, 0, 10),
newSnapshotsBuf: make([]*Change, 0, 10),
flusher: deps.flusher,
}
err := objTree.rebuildFromStorage(nil, nil, nil)
_, err := objTree.rebuildFromStorage(nil, nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to rebuild from storage: %w", err)
}
@ -288,7 +287,6 @@ func buildHistoryTree(deps objectTreeDeps, params HistoryTreeParams) (ht History
keys: make(map[string]crypto.SymKey),
newChangesBuf: make([]*Change, 0, 10),
difSnapshotBuf: make([]*treechangeproto.RawTreeChangeWithId, 0, 10),
notSeenIdxBuf: make([]int, 0, 10),
newSnapshotsBuf: make([]*Change, 0, 10),
flusher: deps.flusher,
}

View file

@ -35,7 +35,7 @@ type ObjectTreeValidator interface {
ValidateFullTree(tree *Tree, aclList list.AclList) error
// ValidateNewChanges should always be entered while holding a read lock on AclList
ValidateNewChanges(tree *Tree, aclList list.AclList, newChanges []*Change) error
FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int)
FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change) (filteredHeads bool, filtered, filteredSnapshots []*Change)
}
type noOpTreeValidator struct {
@ -57,14 +57,13 @@ func (n *noOpTreeValidator) ValidateNewChanges(tree *Tree, aclList list.AclList,
return nil
}
func (n *noOpTreeValidator) FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) {
func (n *noOpTreeValidator) FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change) (filteredHeads bool, filtered, filteredSnapshots []*Change) {
if n.filterFunc == nil {
return false, changes, snapshots, indexes
return false, changes, snapshots
}
for idx, c := range changes {
for _, c := range changes {
// only taking changes which we can read
if n.filterFunc(c) {
newIndexes = append(newIndexes, indexes[idx])
filtered = append(filtered, c)
if c.IsSnapshot {
filteredSnapshots = append(filteredSnapshots, c)
@ -106,24 +105,22 @@ func (v *objectTreeValidator) ValidateNewChanges(tree *Tree, aclList list.AclLis
return
}
func (v *objectTreeValidator) FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change, indexes []int) (filteredHeads bool, filtered, filteredSnapshots []*Change, newIndexes []int) {
func (v *objectTreeValidator) FilterChanges(aclList list.AclList, changes []*Change, snapshots []*Change) (filteredHeads bool, filtered, filteredSnapshots []*Change) {
if !v.shouldFilter {
return false, changes, snapshots, indexes
return false, changes, snapshots
}
aclList.RLock()
defer aclList.RUnlock()
state := aclList.AclState()
for idx, c := range changes {
for _, c := range changes {
// this has to be a root
if c.PreviousIds == nil {
newIndexes = append(newIndexes, indexes[idx])
filtered = append(filtered, c)
filteredSnapshots = append(filteredSnapshots, c)
continue
}
// only taking changes which we can read and for which we have acl heads
if keys, exists := state.Keys()[c.ReadKeyId]; aclList.HasHead(c.AclHeadId) && exists && keys.ReadKey != nil {
newIndexes = append(newIndexes, indexes[idx])
filtered = append(filtered, c)
if c.IsSnapshot {
filteredSnapshots = append(filteredSnapshots, c)

View file

@ -47,10 +47,6 @@ func newTreeBuilder(storage Storage, builder ChangeBuilder) *treeBuilder {
}
}
func (tb *treeBuilder) Build(opts treeBuilderOpts) (*Tree, error) {
return tb.build(opts)
}
func (tb *treeBuilder) BuildFull() (*Tree, error) {
return tb.build(treeBuilderOpts{full: true})
}
@ -61,7 +57,7 @@ var (
totalLowest atomic.Int32
)
func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) {
func (tb *treeBuilder) buildWithAdded(opts treeBuilderOpts) (*Tree, []*Change, error) {
cache := make(map[string]*Change)
tb.ctx = context.Background()
for _, ch := range opts.newChanges {
@ -74,12 +70,12 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) {
if opts.useHeadsSnapshot {
maxOrder, lowest, err := tb.lowestSnapshots(nil, opts.ourHeads, "")
if err != nil {
return nil, err
return nil, nil, err
}
if len(lowest) != 1 {
snapshot, err = tb.commonSnapshot(lowest)
if err != nil {
return nil, err
return nil, nil, err
}
} else {
snapshot = lowest[0]
@ -94,28 +90,29 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) {
if len(opts.ourSnapshotPath) == 0 {
common, err := tb.storage.CommonSnapshot(tb.ctx)
if err != nil {
return nil, err
return nil, nil, err
}
snapshot = common
} else {
our := opts.ourSnapshotPath[0]
_, lowest, err := tb.lowestSnapshots(cache, opts.theirHeads, our)
if err != nil {
return nil, err
return nil, nil, err
}
if len(lowest) != 1 {
snapshot, err = tb.commonSnapshot(lowest)
if err != nil {
return nil, err
return nil, nil, err
}
} else {
snapshot = lowest[0]
}
}
} else {
var err error
snapshot, err = commonSnapshotForTwoPaths(opts.ourSnapshotPath, opts.theirSnapshotPath)
if err != nil {
return nil, err
return nil, nil, err
}
}
} else {
@ -124,10 +121,13 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) {
totalSnapshots.Store(totalSnapshots.Load() + 1)
snapshotCh, err := tb.storage.Get(tb.ctx, snapshot)
if err != nil {
return nil, fmt.Errorf("failed to get common snapshot %s: %w", snapshot, err)
return nil, nil, fmt.Errorf("failed to get common snapshot %s: %w", snapshot, err)
}
rawChange := &treechangeproto.RawTreeChangeWithId{}
var changes []*Change
var (
changes = make([]*Change, 0, 10)
newChanges = make([]*Change, 0, 10)
)
err = tb.storage.GetAfterOrder(tb.ctx, snapshotCh.OrderId, func(ctx context.Context, storageChange StorageChange) (shouldContinue bool, err error) {
if order != "" && storageChange.OrderId > order {
return false, nil
@ -141,21 +141,44 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) {
ch.OrderId = storageChange.OrderId
ch.SnapshotCounter = storageChange.SnapshotCounter
changes = append(changes, ch)
if _, contains := cache[ch.Id]; contains {
delete(cache, ch.Id)
}
return true, nil
})
if err != nil {
return nil, fmt.Errorf("failed to get changes after order: %w", err)
return nil, nil, fmt.Errorf("failed to get changes after order: %w", err)
}
// getting the filtered new changes, we know that we don't have them in storage
for _, change := range cache {
newChanges = append(newChanges, change)
}
if len(changes) == 0 {
return nil, ErrEmpty
return nil, nil, ErrEmpty
}
tr = &Tree{}
changes = append(changes, opts.newChanges...)
tr.AddFast(changes...)
tr := &Tree{}
changes = append(changes, newChanges...)
added := tr.AddFast(changes...)
if opts.useHeadsSnapshot {
// this is important for history, because by default we get everything after the snapshot
tr.LeaveOnlyBefore(opts.ourHeads)
}
return tr, nil
if len(newChanges) > 0 {
newChanges = newChanges[:0]
for _, change := range added {
// only those that are both added and are new we deem newChanges
if _, contains := cache[change.Id]; contains {
newChanges = append(newChanges, change)
}
}
return tr, newChanges, nil
}
return tr, nil, nil
}
func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) {
tr, _, err = tb.buildWithAdded(opts)
return
}
func (tb *treeBuilder) lowestSnapshots(cache map[string]*Change, heads []string, ourSnapshot string) (maxOrder string, snapshots []string, err error) {
@ -202,7 +225,7 @@ func (tb *treeBuilder) lowestSnapshots(cache map[string]*Change, heads []string,
current = append(current, next...)
next = next[:0]
for _, id := range current {
if ch, ok := cache[id]; ok {
if ch, ok := cache[id]; ok && ch.SnapshotId != "" {
if ch.visited {
continue
}