1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Load iterator and snapshot checks

This commit is contained in:
mcrakhman 2024-11-18 21:07:46 +01:00
parent 314f6b035d
commit eca8e37707
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
5 changed files with 108 additions and 142 deletions

View file

@ -16,16 +16,17 @@ var (
// Change is an abstract type for all types of changes
type Change struct {
Next []*Change
PreviousIds []string
Previous []*Change
AclHeadId string
Id string
SnapshotId string
Timestamp int64
ReadKeyId string
Identity crypto.PubKey
Data []byte
Next []*Change
PreviousIds []string
Previous []*Change
AclHeadId string
Id string
SnapshotId string
Timestamp int64
ReadKeyId string
Identity crypto.PubKey
Data []byte
// TODO: add call one time comment
Model interface{}
Signature []byte
DataType string

View file

@ -1,6 +1,9 @@
package objecttree
import (
"context"
"fmt"
"golang.org/x/exp/slices"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
@ -19,12 +22,15 @@ type IteratorBatch struct {
}
type loadIterator struct {
storage Storage
builder ChangeBuilder
loader *rawChangeLoader
cache map[string]rawCacheEntry
idStack []string
heads []string
lastHeads []string
snapshotPath []string
orderId string
root *Change
lastChange *Change
iter *iterator
@ -40,22 +46,24 @@ func newLoadIterator(loader *rawChangeLoader, snapshotPath []string) *loadIterat
}
func (l *loadIterator) NextBatch(maxSize int) (batch IteratorBatch, err error) {
l.iter = newIterator()
defer freeIterator(l.iter)
curSize := 0
batch.Root = l.loader.Root()
batch.Heads = l.lastHeads
batch.SnapshotPath = l.snapshotPath
var curSize int
if l.isExhausted {
return
}
l.isExhausted = true
l.iter.iterateSkip(l.root, l.lastChange, true, func(c *Change) (isContinue bool) {
l.lastChange = c
rawEntry := l.cache[c.Id]
err = l.storage.GetAfterOrder(context.Background(), l.orderId, func(ctx context.Context, c StorageChange) (shouldContinue bool) {
l.orderId = c.OrderId
rawEntry, ok := l.cache[c.Id]
// if there are no such entry in cache continue
if !ok {
return true
}
if rawEntry.removed {
batch.Heads = slice.DiscardFromSlice(batch.Heads, func(s string) bool {
return slices.Contains(c.PreviousIds, s)
return slices.Contains(c.PrevIds, s)
})
if !slices.Contains(batch.Heads, c.Id) {
batch.Heads = append(batch.Heads, c.Id)
@ -68,38 +76,57 @@ func (l *loadIterator) NextBatch(maxSize int) (batch IteratorBatch, err error) {
}
curSize += rawEntry.size
var rawCh *treechangeproto.RawTreeChangeWithId
rawCh, err = l.loader.loadRaw(c.Id)
if err != nil {
return false
}
batch.Batch = append(batch.Batch, rawCh)
batch.Batch = append(batch.Batch, &treechangeproto.RawTreeChangeWithId{
RawChange: c.RawChange,
Id: c.Id,
})
batch.Heads = slice.DiscardFromSlice(batch.Heads, func(s string) bool {
return slices.Contains(c.PreviousIds, s)
return slices.Contains(c.PrevIds, s)
})
if !slices.Contains(batch.Heads, c.Id) {
batch.Heads = append(batch.Heads, c.Id)
}
return true
})
if err != nil {
return
}
l.lastHeads = batch.Heads
return
}
func (l *loadIterator) load(commonSnapshot string, heads, breakpoints []string) (err error) {
ctx := context.Background()
cs, err := l.storage.Get(ctx, commonSnapshot)
if err != nil {
return
}
rawCh := &treechangeproto.RawTreeChangeWithId{}
err = l.storage.GetAfterOrder(ctx, cs.OrderId, func(ctx context.Context, change StorageChange) (shouldContinue bool) {
rawCh.Id = change.Id
rawCh.RawChange = change.RawChange
var ch *Change
ch, err = l.builder.UnmarshallReduced(rawCh)
if err != nil {
return false
}
l.cache[change.Id] = rawCacheEntry{
change: ch,
size: len(change.RawChange),
}
return true
})
if err != nil {
return
}
existingBreakpoints := make([]string, 0, len(breakpoints))
for _, b := range breakpoints {
_, err := l.loader.loadAppendEntry(b)
if err != nil {
_, ok := l.cache[b]
if !ok {
continue
}
existingBreakpoints = append(existingBreakpoints, b)
}
loadedCs, err := l.loader.loadAppendEntry(commonSnapshot)
if err != nil {
return err
}
l.cache[commonSnapshot] = loadedCs
l.heads = heads
dfs := func(
@ -121,12 +148,9 @@ func (l *loadIterator) load(commonSnapshot string, heads, breakpoints []string)
continue
}
if !exists {
entry, err = l.loader.loadAppendEntry(id)
if err != nil {
return
}
// this should not happen
return fmt.Errorf("entry %s not found in cache", id)
}
// setting the counter when we visit
entry = visit(entry)
l.cache[id] = entry
@ -141,19 +165,6 @@ func (l *loadIterator) load(commonSnapshot string, heads, breakpoints []string)
return nil
}
l.idStack = append(l.idStack, heads...)
// load everything
err = dfs(commonSnapshot, heads,
func(_ rawCacheEntry, mapExists bool) bool {
return !mapExists
},
func(entry rawCacheEntry) rawCacheEntry {
entry.position = 0
return entry
})
if err != nil {
return
}
// marking some changes as removed, not to send to anybody
err = dfs(commonSnapshot, existingBreakpoints,
func(entry rawCacheEntry, mapExists bool) bool {
@ -167,40 +178,9 @@ func (l *loadIterator) load(commonSnapshot string, heads, breakpoints []string)
if err != nil {
return
}
// setting next for all changes in batch
err = dfs(commonSnapshot, heads,
func(entry rawCacheEntry, mapExists bool) bool {
return mapExists && !entry.nextSet
},
func(entry rawCacheEntry) rawCacheEntry {
for _, id := range entry.change.PreviousIds {
prevEntry, exists := l.cache[id]
if !exists {
continue
}
prev := prevEntry.change
if len(prev.Next) == 0 || prev.Next[len(prev.Next)-1].Id <= entry.change.Id {
prev.Next = append(prev.Next, entry.change)
} else {
insertIdx := 0
for idx, el := range prev.Next {
if el.Id >= entry.change.Id {
insertIdx = idx
break
}
}
prev.Next = append(prev.Next[:insertIdx+1], prev.Next[insertIdx:]...)
prev.Next[insertIdx] = entry.change
}
}
entry.nextSet = true
return entry
})
if err != nil {
return
}
l.root = loadedCs.change
l.root = l.cache[commonSnapshot].change
l.orderId = l.root.OrderId
l.lastHeads = []string{l.root.Id}
l.lastChange = loadedCs.change
l.lastChange = l.root
return nil
}

View file

@ -84,7 +84,6 @@ type ObjectTree interface {
ReadableObjectTree
SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath, heads []string) ([]*treechangeproto.RawTreeChangeWithId, error)
ChangesAfterCommonSnapshotLoader(snapshotPath, heads []string) (LoadIterator, error)
Storage() treestorage.TreeStorage
@ -758,29 +757,6 @@ func (ot *objectTree) SnapshotPath() []string {
return path
}
func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath, theirHeads []string) ([]*treechangeproto.RawTreeChangeWithId, error) {
if ot.isDeleted {
return nil, ErrDeleted
}
var (
needFullDocument = len(theirPath) == 0
ourPath = ot.SnapshotPath()
// by default returning everything we have from start
commonSnapshot = ourPath[len(ourPath)-1]
err error
)
// if this is non-empty request
if !needFullDocument {
commonSnapshot, err = commonSnapshotForTwoPaths(ourPath, theirPath)
if err != nil {
return nil, err
}
}
return ot.rawChangeLoader.Load(commonSnapshot, ot.tree, theirHeads)
}
func (ot *objectTree) ChangesAfterCommonSnapshotLoader(theirPath, theirHeads []string) (LoadIterator, error) {
if ot.isDeleted {
return nil, ErrDeleted

View file

@ -21,12 +21,10 @@ type rawChangeLoader struct {
}
type rawCacheEntry struct {
change *Change
rawChange *treechangeproto.RawTreeChangeWithId
position int
removed bool
nextSet bool
size int
change *Change
removed bool
nextSet bool
size int
}
func newStorageLoader(treeStorage treestorage.TreeStorage, changeBuilder ChangeBuilder) *rawChangeLoader {

View file

@ -8,6 +8,7 @@ import (
"github.com/anyproto/lexid"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
type Mode int
@ -224,45 +225,52 @@ func (t *Tree) add(c *Change) (attached bool) {
sort.Strings(c.PreviousIds)
}
// attaching only if all prev ids are attached
attached = true
for _, pid := range c.PreviousIds {
if _, ok := t.attached[pid]; ok {
continue
}
attached = false
// updating wait list for either unseen or unAttached changes
wl := t.waitList[pid]
wl = append(wl, c.Id)
t.waitList[pid] = wl
}
if attached {
attach, remove := t.canAttachOrRemove(c, true)
if attach {
t.attach(c, true)
} else {
} else if !remove {
t.unAttached[c.Id] = c
}
return
}
func (t *Tree) canAttach(c *Change) (attach bool) {
func (t *Tree) canAttachOrRemove(c *Change, addToWait bool) (attach, remove bool) {
if c == nil {
return false
return false, false
}
attach = true
for _, id := range c.PreviousIds {
if _, exists := t.attached[id]; !exists {
attach = false
break
prevSnapshots := make([]string, 0, len(c.PreviousIds))
for _, pid := range c.PreviousIds {
if prev, ok := t.attached[pid]; ok {
if prev.IsSnapshot && len(c.PreviousIds) == 1 {
prevSnapshots = append(prevSnapshots, prev.Id)
} else {
prevSnapshots = append(prevSnapshots, prev.SnapshotId)
}
continue
}
attach = false
if addToWait {
// updating wait list for either unseen or unAttached changes
wl := t.waitList[pid]
wl = append(wl, c.Id)
t.waitList[pid] = wl
}
}
if attach {
// we should also have snapshot of attached change inside tree
_, ok := t.attached[c.SnapshotId]
if !ok {
log.Error("snapshot not found in tree", zap.String("id", c.Id), zap.String("snapshot", c.SnapshotId))
attach = false
}
if !attach {
return
}
return
// we should also have snapshot of attached change inside tree
sn, ok := t.attached[c.SnapshotId]
if !ok {
log.Error("snapshot not found in tree", zap.String("id", c.Id), zap.String("snapshot", c.SnapshotId))
return false, true
}
if !slices.Contains(prevSnapshots, sn.SnapshotId) {
log.Error("change has different snapshot than its prev ids", zap.String("id", c.Id), zap.String("snapshot", c.SnapshotId))
return false, true
}
return true, false
}
func (t *Tree) attach(c *Change, newEl bool) {
@ -271,7 +279,7 @@ func (t *Tree) attach(c *Change, newEl bool) {
if !newEl {
delete(t.unAttached, c.Id)
}
if c.IsSnapshot && c.SnapshotCounter == 0 {
if c.SnapshotCounter == 0 {
t.possibleRoots = append(t.possibleRoots, c)
c.SnapshotCounter = t.attached[c.SnapshotId].SnapshotCounter + 1
}
@ -305,8 +313,11 @@ func (t *Tree) attach(c *Change, newEl bool) {
// next can only be in unAttached, because if next is attached then previous (we) are attached
// which is obviously not true, because we are attaching previous only now
next := t.unAttached[wid]
if t.canAttach(next) {
attach, remove := t.canAttachOrRemove(next, false)
if attach {
t.attach(next, false)
} else if remove {
delete(t.unAttached, next.Id)
}
// if we can't attach next that means that some other change will trigger attachment later,
// so we don't care about those changes