mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-09 17:45:03 +09:00
Not verify during migration
This commit is contained in:
parent
53815df4b6
commit
3a6c0c1cae
3 changed files with 68 additions and 23 deletions
|
@ -79,6 +79,22 @@ func verifiableEmptyDataTreeDeps(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func nonVerifiableEmptyDataTreeDeps(
|
||||||
|
rootChange *treechangeproto.RawTreeChangeWithId,
|
||||||
|
storage Storage,
|
||||||
|
aclList list.AclList) objectTreeDeps {
|
||||||
|
changeBuilder := &nonVerifiableChangeBuilder{NewEmptyDataChangeBuilder(crypto.NewKeyStorage(), rootChange)}
|
||||||
|
treeBuilder := newTreeBuilder(storage, changeBuilder)
|
||||||
|
return objectTreeDeps{
|
||||||
|
changeBuilder: changeBuilder,
|
||||||
|
treeBuilder: treeBuilder,
|
||||||
|
storage: storage,
|
||||||
|
validator: newTreeValidator(false, false),
|
||||||
|
aclList: aclList,
|
||||||
|
flusher: &defaultFlusher{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func nonVerifiableTreeDeps(
|
func nonVerifiableTreeDeps(
|
||||||
rootChange *treechangeproto.RawTreeChangeWithId,
|
rootChange *treechangeproto.RawTreeChangeWithId,
|
||||||
storage Storage,
|
storage Storage,
|
||||||
|
@ -138,6 +154,15 @@ func BuildEmptyDataTestableTree(storage Storage, aclList list.AclList) (ObjectTr
|
||||||
return buildObjectTree(deps)
|
return buildObjectTree(deps)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BuildMigratableObjectTree(storage Storage, aclList list.AclList) (ObjectTree, error) {
|
||||||
|
rootChange, err := storage.Root(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
deps := nonVerifiableEmptyDataTreeDeps(rootChange.RawTreeChangeWithId(), storage, aclList)
|
||||||
|
return buildObjectTree(deps)
|
||||||
|
}
|
||||||
|
|
||||||
func BuildKeyFilterableObjectTree(storage Storage, aclList list.AclList) (ObjectTree, error) {
|
func BuildKeyFilterableObjectTree(storage Storage, aclList list.AclList) (ObjectTree, error) {
|
||||||
rootChange, err := storage.Root(context.Background())
|
rootChange, err := storage.Root(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package migration
|
package objecttree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -9,46 +9,65 @@ import (
|
||||||
|
|
||||||
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
|
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
|
||||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
|
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
|
||||||
"github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage"
|
|
||||||
"github.com/anyproto/any-sync/util/crypto"
|
"github.com/anyproto/any-sync/util/crypto"
|
||||||
"github.com/anyproto/any-sync/util/slice"
|
"github.com/anyproto/any-sync/util/slice"
|
||||||
)
|
)
|
||||||
|
|
||||||
type treeMigrator struct {
|
type treeStorage interface {
|
||||||
|
Id() string
|
||||||
|
Root() (*treechangeproto.RawTreeChangeWithId, error)
|
||||||
|
Heads() ([]string, error)
|
||||||
|
SetHeads(heads []string) error
|
||||||
|
AddRawChange(change *treechangeproto.RawTreeChangeWithId) error
|
||||||
|
AddRawChangesSetHeads(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error
|
||||||
|
GetAllChangeIds() ([]string, error)
|
||||||
|
|
||||||
|
GetRawChange(ctx context.Context, id string) (*treechangeproto.RawTreeChangeWithId, error)
|
||||||
|
GetAppendRawChange(ctx context.Context, buf []byte, id string) (*treechangeproto.RawTreeChangeWithId, error)
|
||||||
|
HasChange(ctx context.Context, id string) (bool, error)
|
||||||
|
Delete() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type changesIterator interface {
|
||||||
|
GetAllChanges() ([]*treechangeproto.RawTreeChangeWithId, error)
|
||||||
|
IterateChanges(proc func(id string, rawChange []byte) error) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type TreeMigrator struct {
|
||||||
idStack []string
|
idStack []string
|
||||||
cache map[string]*objecttree.Change
|
cache map[string]*Change
|
||||||
storage oldstorage.TreeStorage
|
storage treeStorage
|
||||||
builder objecttree.ChangeBuilder
|
builder ChangeBuilder
|
||||||
allChanges []*treechangeproto.RawTreeChangeWithId
|
allChanges []*treechangeproto.RawTreeChangeWithId
|
||||||
|
|
||||||
keyStorage crypto.KeyStorage
|
keyStorage crypto.KeyStorage
|
||||||
aclList list.AclList
|
aclList list.AclList
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTreeMigrator(keyStorage crypto.KeyStorage, aclList list.AclList) *treeMigrator {
|
func NewTreeMigrator(keyStorage crypto.KeyStorage, aclList list.AclList) *TreeMigrator {
|
||||||
return &treeMigrator{
|
return &TreeMigrator{
|
||||||
keyStorage: keyStorage,
|
keyStorage: keyStorage,
|
||||||
aclList: aclList,
|
aclList: aclList,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tm *treeMigrator) migrateTreeStorage(ctx context.Context, storage oldstorage.TreeStorage, headStorage headstorage.HeadStorage, store anystore.DB) error {
|
func (tm *TreeMigrator) MigrateTreeStorage(ctx context.Context, storage treeStorage, headStorage headstorage.HeadStorage, store anystore.DB) error {
|
||||||
rootChange, err := storage.Root()
|
rootChange, err := storage.Root()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
tm.allChanges = []*treechangeproto.RawTreeChangeWithId{rootChange}
|
tm.allChanges = []*treechangeproto.RawTreeChangeWithId{rootChange}
|
||||||
tm.storage = storage
|
tm.storage = storage
|
||||||
tm.cache = make(map[string]*objecttree.Change)
|
tm.cache = make(map[string]*Change)
|
||||||
tm.builder = objecttree.NewChangeBuilder(tm.keyStorage, rootChange)
|
tm.builder = NewChangeBuilder(tm.keyStorage, rootChange)
|
||||||
|
tm.builder = &nonVerifiableChangeBuilder{tm.builder}
|
||||||
heads, err := storage.Heads()
|
heads, err := storage.Heads()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("migration: failed to get heads: %w", err)
|
return fmt.Errorf("migration: failed to get heads: %w", err)
|
||||||
}
|
}
|
||||||
if iterStore, ok := storage.(oldstorage.ChangesIterator); ok {
|
if iterStore, ok := storage.(changesIterator); ok {
|
||||||
tm.allChanges, err = iterStore.GetAllChanges()
|
tm.allChanges, err = iterStore.GetAllChanges()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("migration: failed to get all changes: %w", err)
|
return fmt.Errorf("migration: failed to get all changes: %w", err)
|
||||||
|
@ -56,21 +75,21 @@ func (tm *treeMigrator) migrateTreeStorage(ctx context.Context, storage oldstora
|
||||||
} else {
|
} else {
|
||||||
tm.dfs(ctx, heads, rootChange.Id)
|
tm.dfs(ctx, heads, rootChange.Id)
|
||||||
}
|
}
|
||||||
newStorage, err := objecttree.CreateStorage(ctx, rootChange, headStorage, store)
|
newStorage, err := CreateStorage(ctx, rootChange, headStorage, store)
|
||||||
if err != nil && !errors.Is(err, treestorage.ErrTreeExists) {
|
if err != nil && !errors.Is(err, treestorage.ErrTreeExists) {
|
||||||
return fmt.Errorf("migration: failed to create new storage: %w", err)
|
return fmt.Errorf("migration: failed to create new storage: %w", err)
|
||||||
}
|
}
|
||||||
if errors.Is(err, treestorage.ErrTreeExists) {
|
if errors.Is(err, treestorage.ErrTreeExists) {
|
||||||
newStorage, err = objecttree.NewStorage(ctx, rootChange.Id, headStorage, store)
|
newStorage, err = NewStorage(ctx, rootChange.Id, headStorage, store)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("migration: failed to start old storage: %w", err)
|
return fmt.Errorf("migration: failed to start old storage: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
objTree, err := objecttree.BuildObjectTree(newStorage, tm.aclList)
|
objTree, err := BuildMigratableObjectTree(newStorage, tm.aclList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("migration: failed to build object tree: %w", err)
|
return fmt.Errorf("migration: failed to build object tree: %w", err)
|
||||||
}
|
}
|
||||||
addPayload := objecttree.RawChangesPayload{
|
addPayload := RawChangesPayload{
|
||||||
NewHeads: heads,
|
NewHeads: heads,
|
||||||
RawChanges: tm.allChanges,
|
RawChanges: tm.allChanges,
|
||||||
SnapshotPath: []string{rootChange.Id},
|
SnapshotPath: []string{rootChange.Id},
|
||||||
|
@ -87,7 +106,7 @@ func (tm *treeMigrator) migrateTreeStorage(ctx context.Context, storage oldstora
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tm *treeMigrator) dfs(ctx context.Context, heads []string, breakpoint string) {
|
func (tm *TreeMigrator) dfs(ctx context.Context, heads []string, breakpoint string) {
|
||||||
tm.idStack = tm.idStack[:0]
|
tm.idStack = tm.idStack[:0]
|
||||||
uniqMap := map[string]struct{}{breakpoint: {}}
|
uniqMap := map[string]struct{}{breakpoint: {}}
|
||||||
tm.idStack = append(tm.idStack, heads...)
|
tm.idStack = append(tm.idStack, heads...)
|
||||||
|
@ -114,7 +133,7 @@ func (tm *treeMigrator) dfs(ctx context.Context, heads []string, breakpoint stri
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tm *treeMigrator) loadChange(ctx context.Context, id string) (ch *objecttree.Change, err error) {
|
func (tm *TreeMigrator) loadChange(ctx context.Context, id string) (ch *Change, err error) {
|
||||||
if ch, ok := tm.cache[id]; ok {
|
if ch, ok := tm.cache[id]; ok {
|
||||||
return ch, nil
|
return ch, nil
|
||||||
}
|
}
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/anyproto/any-store/anyenc"
|
"github.com/anyproto/any-store/anyenc"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
||||||
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
||||||
"github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage"
|
"github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage"
|
||||||
"github.com/anyproto/any-sync/util/crypto"
|
"github.com/anyproto/any-sync/util/crypto"
|
||||||
|
@ -122,10 +123,10 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progr
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("migration: failed to get stored ids: %w", err)
|
return fmt.Errorf("migration: failed to get stored ids: %w", err)
|
||||||
}
|
}
|
||||||
treeMigrators := make([]*treeMigrator, 0, s.numParallel)
|
treeMigrators := make([]*objecttree.TreeMigrator, 0, s.numParallel)
|
||||||
ch := make(chan *treeMigrator, s.numParallel)
|
ch := make(chan *objecttree.TreeMigrator, s.numParallel)
|
||||||
for i := 0; i < s.numParallel; i++ {
|
for i := 0; i < s.numParallel; i++ {
|
||||||
treeMigrators = append(treeMigrators, newTreeMigrator(crypto.NewKeyStorage(), aclList))
|
treeMigrators = append(treeMigrators, objecttree.NewTreeMigrator(crypto.NewKeyStorage(), aclList))
|
||||||
ch <- treeMigrators[i]
|
ch <- treeMigrators[i]
|
||||||
}
|
}
|
||||||
var allErrors []error
|
var allErrors []error
|
||||||
|
@ -141,7 +142,7 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progr
|
||||||
log.Warn("migration: failed to get old tree storage", zap.String("id", id), zap.Error(err))
|
log.Warn("migration: failed to get old tree storage", zap.String("id", id), zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = tm.migrateTreeStorage(ctx, treeStorage, newStorage.HeadStorage(), newStorage.AnyStore())
|
err = tm.MigrateTreeStorage(ctx, treeStorage, newStorage.HeadStorage(), newStorage.AnyStore())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
allErrors = append(allErrors, fmt.Errorf("migration: failed to migrate tree storage: %w", err))
|
allErrors = append(allErrors, fmt.Errorf("migration: failed to migrate tree storage: %w", err))
|
||||||
return
|
return
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue