1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 02:13:49 +09:00

Close storages

This commit is contained in:
mcrakhman 2025-01-13 11:26:57 +01:00
parent 6914620382
commit 9e965782cc
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B

View file

@ -26,7 +26,7 @@ const (
var ErrAlreadyMigrated = errors.New("already migrated") var ErrAlreadyMigrated = errors.New("already migrated")
type SpaceMigrator interface { type SpaceMigrator interface {
MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error) MigrateId(ctx context.Context, id string, progress Progress) error
CheckMigrated(ctx context.Context, id string) (bool, error) CheckMigrated(ctx context.Context, id string) (bool, error)
} }
@ -58,39 +58,45 @@ func (s *spaceMigrator) CheckMigrated(ctx context.Context, id string) (bool, err
return false, nil return false, nil
} }
func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error) { func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progress) error {
migrated, storage := s.checkMigrated(ctx, id) migrated, storage := s.checkMigrated(ctx, id)
if migrated { if migrated {
storage.Close(ctx) storage.Close(ctx)
return nil, ErrAlreadyMigrated return ErrAlreadyMigrated
} }
if storage != nil { if storage != nil {
err := storage.Close(ctx) err := storage.Close(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to close old storage: %w", err) return fmt.Errorf("migration: failed to close old storage: %w", err)
} }
os.RemoveAll(filepath.Join(s.rootPath, id)) os.RemoveAll(filepath.Join(s.rootPath, id))
} }
oldStorage, err := s.oldProvider.WaitSpaceStorage(ctx, id) oldStorage, err := s.oldProvider.WaitSpaceStorage(ctx, id)
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to get old space storage: %w", err) return fmt.Errorf("migration: failed to get old space storage: %w", err)
} }
defer func() {
err := oldStorage.Close(ctx)
if err != nil {
log.Warn("migration: failed to close old storage", zap.Error(err))
}
}()
header, err := oldStorage.SpaceHeader() header, err := oldStorage.SpaceHeader()
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to get space header: %w", err) return fmt.Errorf("migration: failed to get space header: %w", err)
} }
settingsId := oldStorage.SpaceSettingsId() settingsId := oldStorage.SpaceSettingsId()
settingsRoot, err := oldStorage.TreeRoot(settingsId) settingsRoot, err := oldStorage.TreeRoot(settingsId)
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to get space settings root: %w", err) return fmt.Errorf("migration: failed to get space settings root: %w", err)
} }
aclStorage, err := oldStorage.AclStorage() aclStorage, err := oldStorage.AclStorage()
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to get acl storage: %w", err) return fmt.Errorf("migration: failed to get acl storage: %w", err)
} }
aclRoot, err := aclStorage.Root() aclRoot, err := aclStorage.Root()
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to get acl root: %w", err) return fmt.Errorf("migration: failed to get acl root: %w", err)
} }
createPayload := spacestorage.SpaceStorageCreatePayload{ createPayload := spacestorage.SpaceStorageCreatePayload{
AclWithId: aclRoot, AclWithId: aclRoot,
@ -99,16 +105,22 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progr
} }
newStorage, err := s.newProvider.CreateSpaceStorage(ctx, createPayload) newStorage, err := s.newProvider.CreateSpaceStorage(ctx, createPayload)
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to create new space storage: %w", err) return fmt.Errorf("migration: failed to create new space storage: %w", err)
} }
defer func() {
err := newStorage.Close(ctx)
if err != nil {
log.Warn("migration: failed to close old storage", zap.Error(err))
}
}()
aclList, err := migrateAclList(ctx, aclStorage, newStorage.HeadStorage(), newStorage.AnyStore()) aclList, err := migrateAclList(ctx, aclStorage, newStorage.HeadStorage(), newStorage.AnyStore())
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to migrate acl list: %w", err) return fmt.Errorf("migration: failed to migrate acl list: %w", err)
} }
executor := newMigratePool(ctx, s.numParallel, 0) executor := newMigratePool(ctx, s.numParallel, 0)
storedIds, err := oldStorage.StoredIds() storedIds, err := oldStorage.StoredIds()
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to get stored ids: %w", err) return fmt.Errorf("migration: failed to get stored ids: %w", err)
} }
treeMigrators := make([]*treeMigrator, 0, s.numParallel) treeMigrators := make([]*treeMigrator, 0, s.numParallel)
ch := make(chan *treeMigrator, s.numParallel) ch := make(chan *treeMigrator, s.numParallel)
@ -136,18 +148,18 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progr
} }
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to add task: %w", err) return fmt.Errorf("migration: failed to add task: %w", err)
} }
} }
executor.Run() executor.Run()
err = executor.Wait() err = executor.Wait()
if err != nil { if err != nil {
return nil, fmt.Errorf("migration: failed to wait for executor: %w", err) return fmt.Errorf("migration: failed to wait for executor: %w", err)
} }
if len(allErrors) > 0 { if len(allErrors) > 0 {
return nil, fmt.Errorf("migration failed: %w", errors.Join(allErrors...)) return fmt.Errorf("migration failed: %w", errors.Join(allErrors...))
} }
return newStorage, s.setMigrated(ctx, newStorage.AnyStore()) return s.setMigrated(ctx, newStorage.AnyStore())
} }
func (s *spaceMigrator) checkMigrated(ctx context.Context, id string) (bool, spacestorage.SpaceStorage) { func (s *spaceMigrator) checkMigrated(ctx context.Context, id string) (bool, spacestorage.SpaceStorage) {