1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 09:35:03 +09:00

Add merged head order

This commit is contained in:
mcrakhman 2024-11-17 20:35:25 +01:00
parent 046006d006
commit 77f7c3881a
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
2 changed files with 23 additions and 17 deletions

View file

@ -39,7 +39,7 @@ type AddResultSummary int
type AddResult struct {
OldHeads []string
Heads []string
Added []*treechangeproto.RawTreeChangeWithId
Added []StorageChange
Mode Mode
}
@ -105,6 +105,7 @@ type ObjectTree interface {
type objectTree struct {
treeStorage treestorage.TreeStorage
storage Storage
changeBuilder ChangeBuilder
validator ObjectTreeValidator
rawChangeLoader *rawChangeLoader
@ -516,22 +517,22 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh
}
log := log.With(zap.String("treeId", ot.id))
if shouldRebuildFromStorage {
err = ot.rebuildFromStorage(headsToUse, ot.newChangesBuf)
err = ot.rebuildFromStorage(headsToUse, changesPayload.SnapshotPath, ot.newChangesBuf)
if err != nil {
log.Error("failed to rebuild with new heads", zap.Strings("headsToUse", headsToUse), zap.Error(err))
// rebuilding without new changes
rebuildErr := ot.rebuildFromStorage(nil, nil)
rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
if rebuildErr != nil {
log.Error("failed to rebuild from storage", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr))
}
return
}
addResult, err = ot.createAddResult(prevHeadsCopy, Rebuild, nil, changesPayload.RawChanges)
addResult, err = ot.createAddResult(prevHeadsCopy, Rebuild, changesPayload.RawChanges)
if err != nil {
log.Error("failed to create add result", zap.Strings("headsToUse", headsToUse), zap.Error(err))
// that means that some unattached changes were somehow corrupted in memory
// this shouldn't happen but if that happens, then rebuilding from storage
rebuildErr := ot.rebuildFromStorage(nil, nil)
rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
if rebuildErr != nil {
log.Error("failed to rebuild after add result", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr))
}
@ -558,12 +559,12 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh
err = fmt.Errorf("%w: %w", ErrHasInvalidChanges, err)
return
}
addResult, err = ot.createAddResult(prevHeadsCopy, mode, treeChangesAdded, changesPayload.RawChanges)
addResult, err = ot.createAddResult(prevHeadsCopy, mode, changesPayload.RawChanges)
if err != nil {
// that means that some unattached changes were somehow corrupted in memory
// this shouldn't happen but if that happens, then rebuilding from storage
rollback(treeChangesAdded)
rebuildErr := ot.rebuildFromStorage(nil, nil)
rebuildErr := ot.rebuildFromStorage(nil, nil, nil)
if rebuildErr != nil {
log.Error("failed to rebuild after add result (add to tree)", zap.Strings("heads", ot.Heads()), zap.Error(rebuildErr))
}
@ -573,7 +574,7 @@ func (ot *objectTree) addChangesToTree(ctx context.Context, changesPayload RawCh
}
}
func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, treeChangesAdded []*Change, rawChanges []*treechangeproto.RawTreeChangeWithId) (addResult AddResult, err error) {
func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, rawChanges []*treechangeproto.RawTreeChangeWithId) (addResult AddResult, err error) {
headsCopy := func() []string {
newHeads := make([]string, 0, len(ot.tree.Heads()))
newHeads = append(newHeads, ot.tree.Heads()...)
@ -583,24 +584,27 @@ func (ot *objectTree) createAddResult(oldHeads []string, mode Mode, treeChangesA
// returns changes that we added to the tree as attached this round
// they can include not only the changes that were added now,
// but also the changes that were previously in the tree
getAddedChanges := func() (added []*treechangeproto.RawTreeChangeWithId, err error) {
getAddedChanges := func() (added []StorageChange, err error) {
for _, idx := range ot.notSeenIdxBuf {
rawChange := rawChanges[idx]
if ch, exists := ot.tree.attached[rawChange.Id]; exists {
ot.flusher.MarkNewChange(ch)
added = append(added, rawChange)
added = append(added, StorageChange{
RawChange: rawChange.RawChange,
PrevIds: ch.PreviousIds,
Id: ch.Id,
SnapshotCounter: ch.SnapshotCounter,
SnapshotId: ch.SnapshotId,
OrderId: ch.OrderId,
ChangeSize: len(rawChange.RawChange),
})
}
}
return
}
var added []*treechangeproto.RawTreeChangeWithId
var added []StorageChange
added, err = getAddedChanges()
if !ot.treeBuilder.keepInMemoryData {
for _, ch := range treeChangesAdded {
ch.Data = nil
}
}
if err != nil {
return
}
@ -721,7 +725,7 @@ func (ot *objectTree) SnapshotPath() []string {
// TODO: think that the user may have not all of the snapshots locally
currentSnapshotId := ot.tree.RootId()
for currentSnapshotId != "" {
sn, err := ot.treeBuilder.loadChange(currentSnapshotId)
sn, err := ot.storage.Get(context.Background(), currentSnapshotId)
if err != nil {
break
}

View file

@ -86,6 +86,8 @@ func (t *Tree) AddMergedHead(c *Change) error {
return fmt.Errorf("this is not a new head")
}
}
last := t.attached[t.lastIteratedHeadId]
c.OrderId = lexId.Next(last.OrderId)
t.headIds = []string{c.Id}
t.lastIteratedHeadId = c.Id
return nil