1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 17:45:03 +09:00

Update migrator

This commit is contained in:
mcrakhman 2025-01-06 19:20:15 +01:00
parent 2da2fda10b
commit 8cb2e54eef
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B

View file

@ -5,37 +5,45 @@ import (
"errors"
"fmt"
anystore "github.com/anyproto/any-store"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage"
"github.com/anyproto/any-sync/net/streampool"
"github.com/anyproto/any-sync/util/crypto"
)
type SpaceMigrator interface {
MigrateId(ctx context.Context, id string) (spacestorage.SpaceStorage, error)
}
type spaceMigrator struct {
oldStorage oldstorage.SpaceStorage
oldProvider oldstorage.SpaceStorageProvider
newProvider spacestorage.SpaceStorageProvider
numParallel int
}
func newSpaceMigrator(spaceStorage oldstorage.SpaceStorage, numParallel int) *spaceMigrator {
func NewSpaceMigrator(oldProvider oldstorage.SpaceStorageProvider, newProvider spacestorage.SpaceStorageProvider, numParallel int) SpaceMigrator {
return &spaceMigrator{
oldStorage: spaceStorage,
oldProvider: oldProvider,
newProvider: newProvider,
numParallel: numParallel,
}
}
func (s *spaceMigrator) migrateSpaceStorage(ctx context.Context, store anystore.DB) (spacestorage.SpaceStorage, error) {
header, err := s.oldStorage.SpaceHeader()
func (s *spaceMigrator) MigrateId(ctx context.Context, id string) (spacestorage.SpaceStorage, error) {
oldStorage, err := s.oldProvider.WaitSpaceStorage(ctx, id)
if err != nil {
return nil, fmt.Errorf("migration: failed to get old space storage: %w", err)
}
header, err := oldStorage.SpaceHeader()
if err != nil {
return nil, fmt.Errorf("migration: failed to get space header: %w", err)
}
settingsId := s.oldStorage.SpaceSettingsId()
settingsRoot, err := s.oldStorage.TreeRoot(settingsId)
settingsId := oldStorage.SpaceSettingsId()
settingsRoot, err := oldStorage.TreeRoot(settingsId)
if err != nil {
return nil, fmt.Errorf("migration: failed to get space settings root: %w", err)
}
aclStorage, err := s.oldStorage.AclStorage()
aclStorage, err := oldStorage.AclStorage()
if err != nil {
return nil, fmt.Errorf("migration: failed to get acl storage: %w", err)
}
@ -48,16 +56,16 @@ func (s *spaceMigrator) migrateSpaceStorage(ctx context.Context, store anystore.
SpaceHeaderWithId: header,
SpaceSettingsWithId: settingsRoot,
}
newStorage, err := spacestorage.Create(ctx, store, createPayload)
newStorage, err := s.newProvider.CreateSpaceStorage(ctx, createPayload)
if err != nil {
return nil, fmt.Errorf("migration: failed to create new space storage: %w", err)
}
aclList, err := migrateAclList(ctx, aclStorage, newStorage.HeadStorage(), store)
aclList, err := migrateAclList(ctx, aclStorage, newStorage.HeadStorage(), newStorage.AnyStore())
if err != nil {
return nil, fmt.Errorf("migration: failed to migrate acl list: %w", err)
}
executor := streampool.NewExecPool(s.numParallel, 0)
storedIds, err := s.oldStorage.StoredIds()
storedIds, err := oldStorage.StoredIds()
if err != nil {
return nil, fmt.Errorf("migration: failed to get stored ids: %w", err)
}
@ -74,12 +82,12 @@ func (s *spaceMigrator) migrateSpaceStorage(ctx context.Context, store anystore.
defer func() {
ch <- tm
}()
treeStorage, err := s.oldStorage.TreeStorage(id)
treeStorage, err := oldStorage.TreeStorage(id)
if err != nil {
allErrors = append(allErrors, fmt.Errorf("migration: failed to get tree storage: %w", err))
return
}
err = tm.migrateTreeStorage(ctx, treeStorage, newStorage.HeadStorage(), store)
err = tm.migrateTreeStorage(ctx, treeStorage, newStorage.HeadStorage(), newStorage.AnyStore())
if err != nil {
allErrors = append(allErrors, fmt.Errorf("migration: failed to migrate tree storage: %w", err))
return