From 9e965782cca360f69393c8d5e50f87b08d1c2db3 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 13 Jan 2025 11:26:57 +0100 Subject: [PATCH] Close storages --- .../spacestorage/migration/spacemigrator.go | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/commonspace/spacestorage/migration/spacemigrator.go b/commonspace/spacestorage/migration/spacemigrator.go index 65e28a93..24225274 100644 --- a/commonspace/spacestorage/migration/spacemigrator.go +++ b/commonspace/spacestorage/migration/spacemigrator.go @@ -26,7 +26,7 @@ const ( var ErrAlreadyMigrated = errors.New("already migrated") type SpaceMigrator interface { - MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error) + MigrateId(ctx context.Context, id string, progress Progress) error CheckMigrated(ctx context.Context, id string) (bool, error) } @@ -58,39 +58,45 @@ func (s *spaceMigrator) CheckMigrated(ctx context.Context, id string) (bool, err return false, nil } -func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error) { +func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progress) error { migrated, storage := s.checkMigrated(ctx, id) if migrated { storage.Close(ctx) - return nil, ErrAlreadyMigrated + return ErrAlreadyMigrated } if storage != nil { err := storage.Close(ctx) if err != nil { - return nil, fmt.Errorf("migration: failed to close old storage: %w", err) + return fmt.Errorf("migration: failed to close old storage: %w", err) } os.RemoveAll(filepath.Join(s.rootPath, id)) } oldStorage, err := s.oldProvider.WaitSpaceStorage(ctx, id) if err != nil { - return nil, fmt.Errorf("migration: failed to get old space storage: %w", err) + return fmt.Errorf("migration: failed to get old space storage: %w", err) } + defer func() { + err := oldStorage.Close(ctx) + if err != nil { + log.Warn("migration: failed to close old storage", zap.Error(err)) + } + }() header, err := oldStorage.SpaceHeader() if err != nil { - return nil, fmt.Errorf("migration: failed to get space header: %w", err) + return fmt.Errorf("migration: failed to get space header: %w", err) } settingsId := oldStorage.SpaceSettingsId() settingsRoot, err := oldStorage.TreeRoot(settingsId) if err != nil { - return nil, fmt.Errorf("migration: failed to get space settings root: %w", err) + return fmt.Errorf("migration: failed to get space settings root: %w", err) } aclStorage, err := oldStorage.AclStorage() if err != nil { - return nil, fmt.Errorf("migration: failed to get acl storage: %w", err) + return fmt.Errorf("migration: failed to get acl storage: %w", err) } aclRoot, err := aclStorage.Root() if err != nil { - return nil, fmt.Errorf("migration: failed to get acl root: %w", err) + return fmt.Errorf("migration: failed to get acl root: %w", err) } createPayload := spacestorage.SpaceStorageCreatePayload{ AclWithId: aclRoot, @@ -99,16 +105,22 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progr } newStorage, err := s.newProvider.CreateSpaceStorage(ctx, createPayload) if err != nil { - return nil, fmt.Errorf("migration: failed to create new space storage: %w", err) + return fmt.Errorf("migration: failed to create new space storage: %w", err) } + defer func() { + err := newStorage.Close(ctx) + if err != nil { + log.Warn("migration: failed to close old storage", zap.Error(err)) + } + }() aclList, err := migrateAclList(ctx, aclStorage, newStorage.HeadStorage(), newStorage.AnyStore()) if err != nil { - return nil, fmt.Errorf("migration: failed to migrate acl list: %w", err) + return fmt.Errorf("migration: failed to migrate acl list: %w", err) } executor := newMigratePool(ctx, s.numParallel, 0) storedIds, err := oldStorage.StoredIds() if err != nil { - return nil, fmt.Errorf("migration: failed to get stored ids: %w", err) + return fmt.Errorf("migration: failed to get stored ids: %w", err) } treeMigrators := make([]*treeMigrator, 0, s.numParallel) ch := make(chan *treeMigrator, s.numParallel) @@ -136,18 +148,18 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progr } }) if err != nil { - return nil, fmt.Errorf("migration: failed to add task: %w", err) + return fmt.Errorf("migration: failed to add task: %w", err) } } executor.Run() err = executor.Wait() if err != nil { - return nil, fmt.Errorf("migration: failed to wait for executor: %w", err) + return fmt.Errorf("migration: failed to wait for executor: %w", err) } if len(allErrors) > 0 { - return nil, fmt.Errorf("migration failed: %w", errors.Join(allErrors...)) + return fmt.Errorf("migration failed: %w", errors.Join(allErrors...)) } - return newStorage, s.setMigrated(ctx, newStorage.AnyStore()) + return s.setMigrated(ctx, newStorage.AnyStore()) } func (s *spaceMigrator) checkMigrated(ctx context.Context, id string) (bool, spacestorage.SpaceStorage) {