mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-09 09:35:03 +09:00
WIP space migration
This commit is contained in:
parent
4f695f8656
commit
2da2fda10b
3 changed files with 268 additions and 0 deletions
54
commonspace/spacestorage/migration/aclmigrator.go
Normal file
54
commonspace/spacestorage/migration/aclmigrator.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
anystore "github.com/anyproto/any-store"
|
||||
|
||||
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
|
||||
"github.com/anyproto/any-sync/commonspace/object/accountdata"
|
||||
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||
"github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage"
|
||||
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||
)
|
||||
|
||||
func migrateAclList(ctx context.Context, oldStorage oldstorage.ListStorage, headStorage headstorage.HeadStorage, store anystore.DB) (list.AclList, error) {
|
||||
rootChange, err := oldStorage.Root()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to get acl root change: %w", err)
|
||||
}
|
||||
head, err := oldStorage.Head()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to get acl head: %w", err)
|
||||
}
|
||||
aclStorage, err := list.CreateStorage(ctx, rootChange, headStorage, store)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to create acl storage: %w", err)
|
||||
}
|
||||
keys, err := accountdata.NewRandom()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to generate keys: %w", err)
|
||||
}
|
||||
aclList, err := list.BuildAclListWithIdentity(keys, aclStorage, &list.NoOpAcceptorVerifier{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to build acl list: %w", err)
|
||||
}
|
||||
var (
|
||||
allRecords []*consensusproto.RawRecordWithId
|
||||
rec *consensusproto.RawRecordWithId
|
||||
cur = head
|
||||
)
|
||||
for rec == nil || rec.Id != rootChange.Id {
|
||||
rec, err = oldStorage.GetRawRecord(ctx, cur)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to get acl record: %w", err)
|
||||
}
|
||||
allRecords = append(allRecords, rec)
|
||||
}
|
||||
err = aclList.AddRawRecords(allRecords)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to add acl records: %w", err)
|
||||
}
|
||||
return aclList, nil
|
||||
}
|
98
commonspace/spacestorage/migration/spacemigrator.go
Normal file
98
commonspace/spacestorage/migration/spacemigrator.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
anystore "github.com/anyproto/any-store"
|
||||
|
||||
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
||||
"github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage"
|
||||
"github.com/anyproto/any-sync/net/streampool"
|
||||
"github.com/anyproto/any-sync/util/crypto"
|
||||
)
|
||||
|
||||
type spaceMigrator struct {
|
||||
oldStorage oldstorage.SpaceStorage
|
||||
numParallel int
|
||||
}
|
||||
|
||||
func newSpaceMigrator(spaceStorage oldstorage.SpaceStorage, numParallel int) *spaceMigrator {
|
||||
return &spaceMigrator{
|
||||
oldStorage: spaceStorage,
|
||||
numParallel: numParallel,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *spaceMigrator) migrateSpaceStorage(ctx context.Context, store anystore.DB) (spacestorage.SpaceStorage, error) {
|
||||
header, err := s.oldStorage.SpaceHeader()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to get space header: %w", err)
|
||||
}
|
||||
settingsId := s.oldStorage.SpaceSettingsId()
|
||||
settingsRoot, err := s.oldStorage.TreeRoot(settingsId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to get space settings root: %w", err)
|
||||
}
|
||||
aclStorage, err := s.oldStorage.AclStorage()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to get acl storage: %w", err)
|
||||
}
|
||||
aclRoot, err := aclStorage.Root()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to get acl root: %w", err)
|
||||
}
|
||||
createPayload := spacestorage.SpaceStorageCreatePayload{
|
||||
AclWithId: aclRoot,
|
||||
SpaceHeaderWithId: header,
|
||||
SpaceSettingsWithId: settingsRoot,
|
||||
}
|
||||
newStorage, err := spacestorage.Create(ctx, store, createPayload)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to create new space storage: %w", err)
|
||||
}
|
||||
aclList, err := migrateAclList(ctx, aclStorage, newStorage.HeadStorage(), store)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to migrate acl list: %w", err)
|
||||
}
|
||||
executor := streampool.NewExecPool(s.numParallel, 0)
|
||||
storedIds, err := s.oldStorage.StoredIds()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to get stored ids: %w", err)
|
||||
}
|
||||
treeMigrators := make([]*treeMigrator, 0, s.numParallel)
|
||||
ch := make(chan *treeMigrator, s.numParallel)
|
||||
for i := 0; i < s.numParallel; i++ {
|
||||
treeMigrators = append(treeMigrators, newTreeMigrator(crypto.NewKeyStorage(), aclList))
|
||||
ch <- treeMigrators[i]
|
||||
}
|
||||
var allErrors []error
|
||||
for _, id := range storedIds {
|
||||
err := executor.Add(ctx, func() {
|
||||
tm := <-ch
|
||||
defer func() {
|
||||
ch <- tm
|
||||
}()
|
||||
treeStorage, err := s.oldStorage.TreeStorage(id)
|
||||
if err != nil {
|
||||
allErrors = append(allErrors, fmt.Errorf("migration: failed to get tree storage: %w", err))
|
||||
return
|
||||
}
|
||||
err = tm.migrateTreeStorage(ctx, treeStorage, newStorage.HeadStorage(), store)
|
||||
if err != nil {
|
||||
allErrors = append(allErrors, fmt.Errorf("migration: failed to migrate tree storage: %w", err))
|
||||
return
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("migration: failed to add task: %w", err)
|
||||
}
|
||||
}
|
||||
executor.Run()
|
||||
executor.Close()
|
||||
if len(allErrors) > 0 {
|
||||
return nil, fmt.Errorf("migration failed: %w", errors.Join(allErrors...))
|
||||
}
|
||||
return newStorage, nil
|
||||
}
|
116
commonspace/spacestorage/migration/treemigrator.go
Normal file
116
commonspace/spacestorage/migration/treemigrator.go
Normal file
|
@ -0,0 +1,116 @@
|
|||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
anystore "github.com/anyproto/any-store"
|
||||
|
||||
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
|
||||
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage"
|
||||
"github.com/anyproto/any-sync/util/crypto"
|
||||
"github.com/anyproto/any-sync/util/slice"
|
||||
)
|
||||
|
||||
type treeMigrator struct {
|
||||
idStack []string
|
||||
cache map[string]*objecttree.Change
|
||||
storage oldstorage.TreeStorage
|
||||
builder objecttree.ChangeBuilder
|
||||
allChanges []*treechangeproto.RawTreeChangeWithId
|
||||
|
||||
keyStorage crypto.KeyStorage
|
||||
aclList list.AclList
|
||||
}
|
||||
|
||||
func newTreeMigrator(keyStorage crypto.KeyStorage, aclList list.AclList) *treeMigrator {
|
||||
return &treeMigrator{
|
||||
keyStorage: keyStorage,
|
||||
aclList: aclList,
|
||||
}
|
||||
}
|
||||
|
||||
func (tm *treeMigrator) migrateTreeStorage(ctx context.Context, storage oldstorage.TreeStorage, headStorage headstorage.HeadStorage, store anystore.DB) error {
|
||||
rootChange, err := storage.Root()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tm.allChanges = []*treechangeproto.RawTreeChangeWithId{rootChange}
|
||||
tm.storage = storage
|
||||
tm.cache = make(map[string]*objecttree.Change)
|
||||
tm.builder = objecttree.NewChangeBuilder(tm.keyStorage, rootChange)
|
||||
heads, err := storage.Heads()
|
||||
if err != nil {
|
||||
return fmt.Errorf("migration: failed to get heads: %w", err)
|
||||
}
|
||||
tm.dfs(ctx, heads, rootChange.Id)
|
||||
newStorage, err := objecttree.CreateStorage(ctx, rootChange, headStorage, store)
|
||||
if err != nil {
|
||||
return fmt.Errorf("migration: failed to create new storage: %w", err)
|
||||
}
|
||||
objTree, err := objecttree.BuildObjectTree(newStorage, tm.aclList)
|
||||
if err != nil {
|
||||
return fmt.Errorf("migration: failed to build object tree: %w", err)
|
||||
}
|
||||
addPayload := objecttree.RawChangesPayload{
|
||||
NewHeads: heads,
|
||||
RawChanges: tm.allChanges,
|
||||
SnapshotPath: []string{rootChange.Id},
|
||||
}
|
||||
res, err := objTree.AddRawChanges(ctx, addPayload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("migration: failed to add raw changes: %w", err)
|
||||
}
|
||||
if !slice.UnsortedEquals(res.Heads, heads) {
|
||||
return fmt.Errorf("migration: heads mismatch: %v != %v", res.Heads, heads)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tm *treeMigrator) dfs(ctx context.Context, heads []string, breakpoint string) {
|
||||
tm.idStack = tm.idStack[:0]
|
||||
uniqMap := map[string]struct{}{breakpoint: {}}
|
||||
tm.idStack = append(tm.idStack, heads...)
|
||||
|
||||
for len(tm.idStack) > 0 {
|
||||
id := tm.idStack[len(tm.idStack)-1]
|
||||
tm.idStack = tm.idStack[:len(tm.idStack)-1]
|
||||
if _, exists := uniqMap[id]; exists {
|
||||
continue
|
||||
}
|
||||
|
||||
ch, err := tm.loadChange(ctx, id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
uniqMap[id] = struct{}{}
|
||||
for _, prev := range ch.PreviousIds {
|
||||
if _, exists := uniqMap[prev]; exists {
|
||||
continue
|
||||
}
|
||||
tm.idStack = append(tm.idStack, prev)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tm *treeMigrator) loadChange(ctx context.Context, id string) (ch *objecttree.Change, err error) {
|
||||
if ch, ok := tm.cache[id]; ok {
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
change, err := tm.storage.GetRawChange(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tm.allChanges = append(tm.allChanges, change)
|
||||
ch, err = tm.builder.UnmarshallReduced(change)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tm.cache[id] = ch
|
||||
return ch, nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue