1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-10 01:51:11 +09:00

Load iterator and tree builder changes, remove raw loader

This commit is contained in:
mcrakhman 2024-11-18 21:28:21 +01:00
parent eca8e37707
commit 6052649d7b
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
5 changed files with 72 additions and 361 deletions

View file

@ -10,6 +10,13 @@ import (
"github.com/anyproto/any-sync/util/slice"
)
type rawCacheEntry struct {
change *Change
removed bool
nextSet bool
size int
}
type LoadIterator interface {
NextBatch(maxSize int) (batch IteratorBatch, err error)
}
@ -24,30 +31,28 @@ type IteratorBatch struct {
type loadIterator struct {
storage Storage
builder ChangeBuilder
loader *rawChangeLoader
cache map[string]rawCacheEntry
idStack []string
heads []string
lastHeads []string
snapshotPath []string
orderId string
root *Change
lastChange *Change
iter *iterator
root *treechangeproto.RawTreeChangeWithId
isExhausted bool
}
func newLoadIterator(loader *rawChangeLoader, snapshotPath []string) *loadIterator {
func newLoadIterator(root *treechangeproto.RawTreeChangeWithId, snapshotPath []string, storage Storage, builder ChangeBuilder) *loadIterator {
return &loadIterator{
loader: loader,
storage: storage,
builder: builder,
cache: make(map[string]rawCacheEntry),
snapshotPath: snapshotPath,
root: root,
}
}
func (l *loadIterator) NextBatch(maxSize int) (batch IteratorBatch, err error) {
batch.Root = l.loader.Root()
batch.Heads = l.lastHeads
batch.Root = l.root
batch.SnapshotPath = l.snapshotPath
var curSize int
if l.isExhausted {
@ -178,9 +183,7 @@ func (l *loadIterator) load(commonSnapshot string, heads, breakpoints []string)
if err != nil {
return
}
l.root = l.cache[commonSnapshot].change
l.orderId = l.root.OrderId
l.lastHeads = []string{l.root.Id}
l.lastChange = l.root
l.orderId = cs.OrderId
l.lastHeads = []string{cs.Id}
return nil
}

View file

@ -133,8 +133,13 @@ type objectTree struct {
}
func (ot *objectTree) rebuildFromStorage(theirHeads, theirSnapshotPath []string, newChanges []*Change) (err error) {
oldTree := ot.tree
ourPath := ot.SnapshotPath()
var (
ourPath []string
oldTree = ot.tree
)
if ot.tree != nil {
ourPath = ot.SnapshotPath()
}
ot.tree, err = ot.treeBuilder.Build(treeBuilderOpts{
full: false,
theirHeads: theirHeads,
@ -777,7 +782,7 @@ func (ot *objectTree) ChangesAfterCommonSnapshotLoader(theirPath, theirHeads []s
}
}
iter := newLoadIterator(ot.rawChangeLoader, ourPath)
iter := newLoadIterator(ot.rawRoot, ourPath, ot.storage, ot.changeBuilder)
err = iter.load(commonSnapshot, ot.tree.headIds, theirHeads)
if err != nil {
return nil, err

View file

@ -50,16 +50,14 @@ func verifiableTreeDeps(
treeStorage treestorage.TreeStorage,
aclList list.AclList) objectTreeDeps {
changeBuilder := NewChangeBuilder(crypto.NewKeyStorage(), rootChange)
rawLoader := newRawChangeLoader(treeStorage, changeBuilder)
treeBuilder := newTreeBuilder(true, treeStorage, changeBuilder, rawLoader)
return objectTreeDeps{
changeBuilder: changeBuilder,
treeBuilder: treeBuilder,
treeStorage: treeStorage,
validator: newTreeValidator(false, false),
rawChangeLoader: rawLoader,
aclList: aclList,
flusher: &defaultFlusher{},
changeBuilder: changeBuilder,
treeBuilder: treeBuilder,
treeStorage: treeStorage,
validator: newTreeValidator(false, false),
aclList: aclList,
flusher: &defaultFlusher{},
}
}
@ -70,16 +68,14 @@ func verifiableEmptyDataTreeDeps(
treeStorage treestorage.TreeStorage,
aclList list.AclList) objectTreeDeps {
changeBuilder := NewEmptyDataChangeBuilder(crypto.NewKeyStorage(), rootChange)
loader := newStorageLoader(treeStorage, changeBuilder)
treeBuilder := newTreeBuilder(false, treeStorage, changeBuilder, loader)
return objectTreeDeps{
changeBuilder: changeBuilder,
treeBuilder: treeBuilder,
treeStorage: treeStorage,
validator: newTreeValidator(false, false),
rawChangeLoader: loader,
aclList: aclList,
flusher: &defaultFlusher{},
changeBuilder: changeBuilder,
treeBuilder: treeBuilder,
treeStorage: treeStorage,
validator: newTreeValidator(false, false),
aclList: aclList,
flusher: &defaultFlusher{},
}
}
@ -88,16 +84,14 @@ func nonVerifiableTreeDeps(
treeStorage treestorage.TreeStorage,
aclList list.AclList) objectTreeDeps {
changeBuilder := &nonVerifiableChangeBuilder{NewChangeBuilder(newMockKeyStorage(), rootChange)}
loader := newRawChangeLoader(treeStorage, changeBuilder)
treeBuilder := newTreeBuilder(true, treeStorage, changeBuilder, loader)
return objectTreeDeps{
changeBuilder: changeBuilder,
treeBuilder: treeBuilder,
treeStorage: treeStorage,
validator: &noOpTreeValidator{},
rawChangeLoader: loader,
aclList: aclList,
flusher: &defaultFlusher{},
changeBuilder: changeBuilder,
treeBuilder: treeBuilder,
treeStorage: treeStorage,
validator: &noOpTreeValidator{},
aclList: aclList,
flusher: &defaultFlusher{},
}
}
@ -115,15 +109,13 @@ func BuildTestableTree(treeStorage treestorage.TreeStorage, aclList list.AclList
changeBuilder := &nonVerifiableChangeBuilder{
ChangeBuilder: NewChangeBuilder(newMockKeyStorage(), root),
}
loader := newRawChangeLoader(treeStorage, changeBuilder)
deps := objectTreeDeps{
changeBuilder: changeBuilder,
treeBuilder: newTreeBuilder(true, treeStorage, changeBuilder, loader),
treeStorage: treeStorage,
rawChangeLoader: loader,
validator: &noOpTreeValidator{},
aclList: aclList,
flusher: &defaultFlusher{},
changeBuilder: changeBuilder,
treeBuilder: newTreeBuilder(true, treeStorage, changeBuilder, loader),
treeStorage: treeStorage,
validator: &noOpTreeValidator{},
aclList: aclList,
flusher: &defaultFlusher{},
}
return buildObjectTree(deps)
@ -134,15 +126,13 @@ func BuildEmptyDataTestableTree(treeStorage treestorage.TreeStorage, aclList lis
changeBuilder := &nonVerifiableChangeBuilder{
ChangeBuilder: NewEmptyDataChangeBuilder(newMockKeyStorage(), root),
}
loader := newStorageLoader(treeStorage, changeBuilder)
deps := objectTreeDeps{
changeBuilder: changeBuilder,
treeBuilder: newTreeBuilder(false, treeStorage, changeBuilder, loader),
treeStorage: treeStorage,
rawChangeLoader: loader,
validator: &noOpTreeValidator{},
aclList: aclList,
flusher: &defaultFlusher{},
changeBuilder: changeBuilder,
treeBuilder: newTreeBuilder(false, treeStorage, changeBuilder, loader),
treeStorage: treeStorage,
validator: &noOpTreeValidator{},
aclList: aclList,
flusher: &defaultFlusher{},
}
return buildObjectTree(deps)

View file

@ -1,296 +0,0 @@
package objecttree
import (
"context"
"time"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/util/slice"
)
type rawChangeLoader struct {
treeStorage treestorage.TreeStorage
changeBuilder ChangeBuilder
alwaysFromStorage bool
buf []byte
// buffers
idStack []string
cache map[string]rawCacheEntry
}
type rawCacheEntry struct {
change *Change
removed bool
nextSet bool
size int
}
func newStorageLoader(treeStorage treestorage.TreeStorage, changeBuilder ChangeBuilder) *rawChangeLoader {
loader := newRawChangeLoader(treeStorage, changeBuilder)
loader.alwaysFromStorage = true
return loader
}
func newRawChangeLoader(treeStorage treestorage.TreeStorage, changeBuilder ChangeBuilder) *rawChangeLoader {
return &rawChangeLoader{
treeStorage: treeStorage,
changeBuilder: changeBuilder,
}
}
func (r *rawChangeLoader) Load(commonSnapshot string, t *Tree, breakpoints []string) ([]*treechangeproto.RawTreeChangeWithId, error) {
if commonSnapshot == t.root.Id && !r.alwaysFromStorage {
return r.loadFromTree(t, breakpoints)
} else {
return r.loadFromStorage(commonSnapshot, t.Heads(), breakpoints)
}
}
func (r *rawChangeLoader) loadFromTree(t *Tree, breakpoints []string) ([]*treechangeproto.RawTreeChangeWithId, error) {
var stack []*Change
for _, h := range t.headIds {
stack = append(stack, t.attached[h])
}
convert := func(chs []*Change) (rawChanges []*treechangeproto.RawTreeChangeWithId, err error) {
for _, ch := range chs {
var raw *treechangeproto.RawTreeChangeWithId
raw, err = r.changeBuilder.Marshall(ch)
if err != nil {
return
}
rawChanges = append(rawChanges, raw)
}
return
}
// getting all changes that we visit
var results []*Change
rootVisited := false
t.dfsPrev(
stack,
breakpoints,
func(ch *Change) bool {
results = append(results, ch)
return true
},
func(visited []*Change) {
if t.root.visited {
rootVisited = true
}
},
)
// if we stopped at breakpoints or there are no breakpoints
if !rootVisited || len(breakpoints) == 0 {
// in this case we will add root if there are no breakpoints
return convert(results)
}
// now starting from breakpoints
stack = stack[:0]
for _, h := range breakpoints {
if c, exists := t.attached[h]; exists {
stack = append(stack, c)
}
}
// doing another dfs to get all changes before breakpoints, we need to exclude them from results
// if we don't have some breakpoints we will just ignore them
t.dfsPrev(
stack,
[]string{},
func(ch *Change) bool {
return true
},
func(visited []*Change) {
results = slice.DiscardFromSlice(results, func(change *Change) bool {
return change.visited
})
},
)
// otherwise we want to exclude everything that wasn't in breakpoints
return convert(results)
}
func (r *rawChangeLoader) loadFromStorage(commonSnapshot string, heads, breakpoints []string) ([]*treechangeproto.RawTreeChangeWithId, error) {
// resetting cache
r.cache = make(map[string]rawCacheEntry)
defer func() {
r.cache = nil
}()
existingBreakpoints := make([]string, 0, len(breakpoints))
for _, b := range breakpoints {
entry, err := r.loadEntry(b)
if err != nil {
continue
}
r.cache[b] = entry
existingBreakpoints = append(existingBreakpoints, b)
}
r.cache[commonSnapshot] = rawCacheEntry{position: -1}
dfs := func(
commonSnapshot string,
heads []string,
shouldVisit func(entry rawCacheEntry, mapExists bool) bool,
visit func(entry rawCacheEntry) rawCacheEntry) bool {
// resetting stack
r.idStack = r.idStack[:0]
r.idStack = append(r.idStack, heads...)
commonSnapshotVisited := false
var err error
for len(r.idStack) > 0 {
id := r.idStack[len(r.idStack)-1]
r.idStack = r.idStack[:len(r.idStack)-1]
entry, exists := r.cache[id]
if !shouldVisit(entry, exists) {
continue
}
if id == commonSnapshot {
commonSnapshotVisited = true
continue
}
if !exists {
entry, err = r.loadEntry(id)
if err != nil {
continue
}
}
// setting the counter when we visit
entry = visit(entry)
r.cache[id] = entry
for _, prev := range entry.change.PreviousIds {
if prev == commonSnapshot {
commonSnapshotVisited = true
break
}
prevEntry, exists := r.cache[prev]
if !shouldVisit(prevEntry, exists) {
continue
}
r.idStack = append(r.idStack, prev)
}
}
return commonSnapshotVisited
}
// preparing first pass
r.idStack = append(r.idStack, heads...)
var buffer []*treechangeproto.RawTreeChangeWithId
rootVisited := dfs(commonSnapshot, heads,
func(_ rawCacheEntry, mapExists bool) bool {
return !mapExists
},
func(entry rawCacheEntry) rawCacheEntry {
buffer = append(buffer, entry.rawChange)
entry.position = len(buffer) - 1
return entry
})
// checking if we stopped at breakpoints
if !rootVisited {
return buffer, nil
}
// if there are no breakpoints then we should load root also
if len(breakpoints) == 0 {
common, err := r.loadEntry(commonSnapshot)
if err != nil {
return nil, err
}
buffer = append(buffer, common.rawChange)
return buffer, nil
}
// marking all visited as nil
dfs(commonSnapshot, existingBreakpoints,
func(entry rawCacheEntry, mapExists bool) bool {
// only going through already loaded changes
return mapExists && !entry.removed
},
func(entry rawCacheEntry) rawCacheEntry {
entry.removed = true
if entry.position != -1 {
buffer[entry.position] = nil
}
return entry
})
// discarding visited
buffer = slice.DiscardFromSlice(buffer, func(change *treechangeproto.RawTreeChangeWithId) bool {
return change == nil
})
return buffer, nil
}
func (r *rawChangeLoader) loadEntry(id string) (entry rawCacheEntry, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
rawChange, err := r.treeStorage.GetRawChange(ctx, id)
if err != nil {
return
}
change, err := r.changeBuilder.Unmarshall(rawChange, false)
if err != nil {
return
}
entry = rawCacheEntry{
change: change,
rawChange: rawChange,
position: -1,
size: len(rawChange.RawChange),
}
return
}
func (r *rawChangeLoader) loadRaw(id string) (ch *treechangeproto.RawTreeChangeWithId, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
return r.treeStorage.GetRawChange(ctx, id)
}
func (r *rawChangeLoader) loadAppendRaw(id string) (ch *treechangeproto.RawTreeChangeWithId, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
return r.treeStorage.GetAppendRawChange(ctx, r.buf[:0], id)
}
func (r *rawChangeLoader) loadAppendEntry(id string) (entry rawCacheEntry, err error) {
rawChange, err := r.loadAppendRaw(id)
if err != nil {
return
}
size := len(rawChange.RawChange)
r.buf = rawChange.RawChange
change, err := r.changeBuilder.UnmarshallReduced(rawChange)
if err != nil {
return
}
entry = rawCacheEntry{
change: change,
position: -1,
size: size,
}
return
}
func (r *rawChangeLoader) Root() *treechangeproto.RawTreeChangeWithId {
root, _ := r.treeStorage.Root()
return root
}

View file

@ -64,15 +64,23 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) {
var snapshot string
if !opts.full {
if len(opts.theirSnapshotPath) == 0 {
our := opts.ourSnapshotPath[len(opts.ourSnapshotPath)-1]
lowest := tb.lowestSnapshots(cache, opts.theirHeads, our)
if len(lowest) != 1 {
snapshot, err = tb.commonSnapshot(lowest)
if len(opts.ourSnapshotPath) == 0 {
common, err := tb.storage.CommonSnapshot()
if err != nil {
return nil, err
}
snapshot = common
} else {
snapshot = lowest[0]
our := opts.ourSnapshotPath[len(opts.ourSnapshotPath)-1]
lowest := tb.lowestSnapshots(cache, opts.theirHeads, our)
if len(lowest) != 1 {
snapshot, err = tb.commonSnapshot(lowest)
if err != nil {
return nil, err
}
} else {
snapshot = lowest[0]
}
}
} else {
snapshot, err = commonSnapshotForTwoPaths(opts.ourSnapshotPath, opts.theirSnapshotPath)
@ -81,7 +89,7 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) {
}
}
} else {
snapshot = tb.treeStorage.Id()
snapshot = tb.storage.Id()
}
snapshotCh, err := tb.storage.Get(tb.ctx, snapshot)
if err != nil {
@ -89,13 +97,15 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) {
}
rawChange := &treechangeproto.RawTreeChangeWithId{}
var changes []*Change
err = tb.storage.GetAfterOrder(tb.ctx, snapshotCh.OrderId, func(ctx context.Context, change StorageChange) (shouldContinue bool) {
rawChange.Id = change.Id
rawChange.RawChange = change.RawChange
err = tb.storage.GetAfterOrder(tb.ctx, snapshotCh.OrderId, func(ctx context.Context, storageChange StorageChange) (shouldContinue bool) {
rawChange.Id = storageChange.Id
rawChange.RawChange = storageChange.RawChange
ch, err := tb.builder.Unmarshall(rawChange, false)
if err != nil {
return false
}
ch.OrderId = storageChange.OrderId
ch.SnapshotCounter = storageChange.SnapshotCounter
changes = append(changes, ch)
return true
})
@ -103,7 +113,6 @@ func (tb *treeBuilder) build(opts treeBuilderOpts) (tr *Tree, err error) {
return nil, err
}
tr = &Tree{}
// TODO: enrich data with order ids
changes = append(changes, opts.newChanges...)
tr.AddFast(changes...)
return tr, nil