diff --git a/commonspace/spacestorage/migration/aclmigrator.go b/commonspace/spacestorage/migration/aclmigrator.go new file mode 100644 index 00000000..1ac1b46b --- /dev/null +++ b/commonspace/spacestorage/migration/aclmigrator.go @@ -0,0 +1,54 @@ +package migration + +import ( + "context" + "fmt" + + anystore "github.com/anyproto/any-store" + + "github.com/anyproto/any-sync/commonspace/headsync/headstorage" + "github.com/anyproto/any-sync/commonspace/object/accountdata" + "github.com/anyproto/any-sync/commonspace/object/acl/list" + "github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage" + "github.com/anyproto/any-sync/consensus/consensusproto" +) + +func migrateAclList(ctx context.Context, oldStorage oldstorage.ListStorage, headStorage headstorage.HeadStorage, store anystore.DB) (list.AclList, error) { + rootChange, err := oldStorage.Root() + if err != nil { + return nil, fmt.Errorf("migration: failed to get acl root change: %w", err) + } + head, err := oldStorage.Head() + if err != nil { + return nil, fmt.Errorf("migration: failed to get acl head: %w", err) + } + aclStorage, err := list.CreateStorage(ctx, rootChange, headStorage, store) + if err != nil { + return nil, fmt.Errorf("migration: failed to create acl storage: %w", err) + } + keys, err := accountdata.NewRandom() + if err != nil { + return nil, fmt.Errorf("migration: failed to generate keys: %w", err) + } + aclList, err := list.BuildAclListWithIdentity(keys, aclStorage, &list.NoOpAcceptorVerifier{}) + if err != nil { + return nil, fmt.Errorf("migration: failed to build acl list: %w", err) + } + var ( + allRecords []*consensusproto.RawRecordWithId + rec *consensusproto.RawRecordWithId + cur = head + ) + for rec == nil || rec.Id != rootChange.Id { + rec, err = oldStorage.GetRawRecord(ctx, cur) + if err != nil { + return nil, fmt.Errorf("migration: failed to get acl record: %w", err) + } + allRecords = append(allRecords, rec) + } + err = aclList.AddRawRecords(allRecords) + if err != nil { + return nil, fmt.Errorf("migration: failed to add acl records: %w", err) + } + return aclList, nil +} diff --git a/commonspace/spacestorage/migration/spacemigrator.go b/commonspace/spacestorage/migration/spacemigrator.go new file mode 100644 index 00000000..86042f9f --- /dev/null +++ b/commonspace/spacestorage/migration/spacemigrator.go @@ -0,0 +1,98 @@ +package migration + +import ( + "context" + "errors" + "fmt" + + anystore "github.com/anyproto/any-store" + + "github.com/anyproto/any-sync/commonspace/spacestorage" + "github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage" + "github.com/anyproto/any-sync/net/streampool" + "github.com/anyproto/any-sync/util/crypto" +) + +type spaceMigrator struct { + oldStorage oldstorage.SpaceStorage + numParallel int +} + +func newSpaceMigrator(spaceStorage oldstorage.SpaceStorage, numParallel int) *spaceMigrator { + return &spaceMigrator{ + oldStorage: spaceStorage, + numParallel: numParallel, + } +} + +func (s *spaceMigrator) migrateSpaceStorage(ctx context.Context, store anystore.DB) (spacestorage.SpaceStorage, error) { + header, err := s.oldStorage.SpaceHeader() + if err != nil { + return nil, fmt.Errorf("migration: failed to get space header: %w", err) + } + settingsId := s.oldStorage.SpaceSettingsId() + settingsRoot, err := s.oldStorage.TreeRoot(settingsId) + if err != nil { + return nil, fmt.Errorf("migration: failed to get space settings root: %w", err) + } + aclStorage, err := s.oldStorage.AclStorage() + if err != nil { + return nil, fmt.Errorf("migration: failed to get acl storage: %w", err) + } + aclRoot, err := aclStorage.Root() + if err != nil { + return nil, fmt.Errorf("migration: failed to get acl root: %w", err) + } + createPayload := spacestorage.SpaceStorageCreatePayload{ + AclWithId: aclRoot, + SpaceHeaderWithId: header, + SpaceSettingsWithId: settingsRoot, + } + newStorage, err := spacestorage.Create(ctx, store, createPayload) + if err != nil { + return nil, fmt.Errorf("migration: failed to create new space storage: %w", err) + } + aclList, err := migrateAclList(ctx, aclStorage, newStorage.HeadStorage(), store) + if err != nil { + return nil, fmt.Errorf("migration: failed to migrate acl list: %w", err) + } + executor := streampool.NewExecPool(s.numParallel, 0) + storedIds, err := s.oldStorage.StoredIds() + if err != nil { + return nil, fmt.Errorf("migration: failed to get stored ids: %w", err) + } + treeMigrators := make([]*treeMigrator, 0, s.numParallel) + ch := make(chan *treeMigrator, s.numParallel) + for i := 0; i < s.numParallel; i++ { + treeMigrators = append(treeMigrators, newTreeMigrator(crypto.NewKeyStorage(), aclList)) + ch <- treeMigrators[i] + } + var allErrors []error + for _, id := range storedIds { + err := executor.Add(ctx, func() { + tm := <-ch + defer func() { + ch <- tm + }() + treeStorage, err := s.oldStorage.TreeStorage(id) + if err != nil { + allErrors = append(allErrors, fmt.Errorf("migration: failed to get tree storage: %w", err)) + return + } + err = tm.migrateTreeStorage(ctx, treeStorage, newStorage.HeadStorage(), store) + if err != nil { + allErrors = append(allErrors, fmt.Errorf("migration: failed to migrate tree storage: %w", err)) + return + } + }) + if err != nil { + return nil, fmt.Errorf("migration: failed to add task: %w", err) + } + } + executor.Run() + executor.Close() + if len(allErrors) > 0 { + return nil, fmt.Errorf("migration failed: %w", errors.Join(allErrors...)) + } + return newStorage, nil +} diff --git a/commonspace/spacestorage/migration/treemigrator.go b/commonspace/spacestorage/migration/treemigrator.go new file mode 100644 index 00000000..01651942 --- /dev/null +++ b/commonspace/spacestorage/migration/treemigrator.go @@ -0,0 +1,116 @@ +package migration + +import ( + "context" + "fmt" + + anystore "github.com/anyproto/any-store" + + "github.com/anyproto/any-sync/commonspace/headsync/headstorage" + "github.com/anyproto/any-sync/commonspace/object/acl/list" + "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage" + "github.com/anyproto/any-sync/util/crypto" + "github.com/anyproto/any-sync/util/slice" +) + +type treeMigrator struct { + idStack []string + cache map[string]*objecttree.Change + storage oldstorage.TreeStorage + builder objecttree.ChangeBuilder + allChanges []*treechangeproto.RawTreeChangeWithId + + keyStorage crypto.KeyStorage + aclList list.AclList +} + +func newTreeMigrator(keyStorage crypto.KeyStorage, aclList list.AclList) *treeMigrator { + return &treeMigrator{ + keyStorage: keyStorage, + aclList: aclList, + } +} + +func (tm *treeMigrator) migrateTreeStorage(ctx context.Context, storage oldstorage.TreeStorage, headStorage headstorage.HeadStorage, store anystore.DB) error { + rootChange, err := storage.Root() + if err != nil { + return err + } + tm.allChanges = []*treechangeproto.RawTreeChangeWithId{rootChange} + tm.storage = storage + tm.cache = make(map[string]*objecttree.Change) + tm.builder = objecttree.NewChangeBuilder(tm.keyStorage, rootChange) + heads, err := storage.Heads() + if err != nil { + return fmt.Errorf("migration: failed to get heads: %w", err) + } + tm.dfs(ctx, heads, rootChange.Id) + newStorage, err := objecttree.CreateStorage(ctx, rootChange, headStorage, store) + if err != nil { + return fmt.Errorf("migration: failed to create new storage: %w", err) + } + objTree, err := objecttree.BuildObjectTree(newStorage, tm.aclList) + if err != nil { + return fmt.Errorf("migration: failed to build object tree: %w", err) + } + addPayload := objecttree.RawChangesPayload{ + NewHeads: heads, + RawChanges: tm.allChanges, + SnapshotPath: []string{rootChange.Id}, + } + res, err := objTree.AddRawChanges(ctx, addPayload) + if err != nil { + return fmt.Errorf("migration: failed to add raw changes: %w", err) + } + if !slice.UnsortedEquals(res.Heads, heads) { + return fmt.Errorf("migration: heads mismatch: %v != %v", res.Heads, heads) + } + return nil +} + +func (tm *treeMigrator) dfs(ctx context.Context, heads []string, breakpoint string) { + tm.idStack = tm.idStack[:0] + uniqMap := map[string]struct{}{breakpoint: {}} + tm.idStack = append(tm.idStack, heads...) + + for len(tm.idStack) > 0 { + id := tm.idStack[len(tm.idStack)-1] + tm.idStack = tm.idStack[:len(tm.idStack)-1] + if _, exists := uniqMap[id]; exists { + continue + } + + ch, err := tm.loadChange(ctx, id) + if err != nil { + continue + } + + uniqMap[id] = struct{}{} + for _, prev := range ch.PreviousIds { + if _, exists := uniqMap[prev]; exists { + continue + } + tm.idStack = append(tm.idStack, prev) + } + } +} + +func (tm *treeMigrator) loadChange(ctx context.Context, id string) (ch *objecttree.Change, err error) { + if ch, ok := tm.cache[id]; ok { + return ch, nil + } + + change, err := tm.storage.GetRawChange(ctx, id) + if err != nil { + return nil, err + } + tm.allChanges = append(tm.allChanges, change) + ch, err = tm.builder.UnmarshallReduced(change) + if err != nil { + return nil, err + } + tm.cache[id] = ch + return ch, nil +}