From b97ff65643b969b1d4c0c176383fa420ebfb9e8d Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 8 Jan 2025 18:33:13 +0100 Subject: [PATCH] Add already migrated logic --- commonspace/object/tree/objecttree/storage.go | 12 --- .../spacestorage/migration/contextwg.go | 39 +++++++++ .../spacestorage/migration/executor.go | 17 ++-- .../spacestorage/migration/spacemigrator.go | 79 ++++++++++++++++++- 4 files changed, 124 insertions(+), 23 deletions(-) create mode 100644 commonspace/spacestorage/migration/contextwg.go diff --git a/commonspace/object/tree/objecttree/storage.go b/commonspace/object/tree/objecttree/storage.go index 1c43bb3d..dd5faa6a 100644 --- a/commonspace/object/tree/objecttree/storage.go +++ b/commonspace/object/tree/objecttree/storage.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "sync/atomic" "time" anystore "github.com/anyproto/any-store" @@ -275,19 +274,8 @@ func (s *storage) CommonSnapshot(ctx context.Context) (string, error) { return entry.CommonSnapshot, nil } -var ( - totalCalls atomic.Int32 - totalTime atomic.Int64 -) - func (s *storage) Get(ctx context.Context, id string) (StorageChange, error) { - tm := time.Now() - totalCalls.Add(1) doc, err := s.changesColl.FindId(ctx, id) - totalTime.Add(int64(time.Since(tm))) - if totalCalls.Load()%100 == 0 { - fmt.Println("[x]: totalTime", time.Duration(totalTime.Load()).String(), "totalCalls", totalCalls.Load()) - } if err != nil { return StorageChange{}, err } diff --git a/commonspace/spacestorage/migration/contextwg.go b/commonspace/spacestorage/migration/contextwg.go new file mode 100644 index 00000000..47f5c7d2 --- /dev/null +++ b/commonspace/spacestorage/migration/contextwg.go @@ -0,0 +1,39 @@ +package migration + +import ( + "context" + "sync" +) + +type contextWaitGroup struct { + ctx context.Context + wg sync.WaitGroup +} + +func newContextWaitGroup(ctx context.Context) *contextWaitGroup { + return &contextWaitGroup{ + ctx: ctx, + } +} + +func (cwg *contextWaitGroup) Add(delta int) { + cwg.wg.Add(delta) +} + +func (cwg *contextWaitGroup) Done() { + cwg.wg.Done() +} + +func (cwg *contextWaitGroup) Wait() error { + doneCh := make(chan struct{}) + go func() { + cwg.wg.Wait() + close(doneCh) + }() + select { + case <-doneCh: + return nil + case <-cwg.ctx.Done(): + return cwg.ctx.Err() + } +} diff --git a/commonspace/spacestorage/migration/executor.go b/commonspace/spacestorage/migration/executor.go index e16fa86f..d3ebe983 100644 --- a/commonspace/spacestorage/migration/executor.go +++ b/commonspace/spacestorage/migration/executor.go @@ -2,7 +2,6 @@ package migration import ( "context" - "sync" "github.com/cheggaaa/mb/v3" "go.uber.org/zap" @@ -12,9 +11,11 @@ import ( var log = logger.NewNamed("common.spacestorage.migration") -func newMigratePool(workers, maxSize int) *migratePool { +func newMigratePool(ctx context.Context, workers, maxSize int) *migratePool { ss := &migratePool{ workers: workers, + ctx: ctx, + wg: newContextWaitGroup(ctx), batch: mb.New[func()](maxSize), } return ss @@ -24,7 +25,8 @@ type migratePool struct { workers int batch *mb.MB[func()] close chan struct{} - wg sync.WaitGroup + ctx context.Context + wg *contextWaitGroup } func (mp *migratePool) Add(ctx context.Context, f ...func()) (err error) { @@ -51,7 +53,7 @@ func (mp *migratePool) Run() { func (mp *migratePool) sendLoop() { for { - f, err := mp.batch.WaitOne(context.Background()) + f, err := mp.batch.WaitOne(mp.ctx) if err != nil { log.Debug("close send loop", zap.Error(err)) return @@ -61,7 +63,8 @@ func (mp *migratePool) sendLoop() { } } -func (mp *migratePool) Close() (err error) { - mp.wg.Wait() - return mp.batch.Close() +func (mp *migratePool) Wait() (err error) { + err = mp.wg.Wait() + mp.batch.Close() + return err } diff --git a/commonspace/spacestorage/migration/spacemigrator.go b/commonspace/spacestorage/migration/spacemigrator.go index f10b8a7b..c767a290 100644 --- a/commonspace/spacestorage/migration/spacemigrator.go +++ b/commonspace/spacestorage/migration/spacemigrator.go @@ -4,7 +4,12 @@ import ( "context" "errors" "fmt" + "os" + "path/filepath" + "time" + anystore "github.com/anyproto/any-store" + "github.com/anyproto/any-store/anyenc" "go.uber.org/zap" "github.com/anyproto/any-sync/commonspace/spacestorage" @@ -12,6 +17,14 @@ import ( "github.com/anyproto/any-sync/util/crypto" ) +const ( + migratedColl = "migration" + migratedDoc = "state" + migratedTimeKey = "t" +) + +var ErrAlreadyMigrated = errors.New("already migrated") + type SpaceMigrator interface { MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error) } @@ -24,17 +37,31 @@ type spaceMigrator struct { oldProvider oldstorage.SpaceStorageProvider newProvider spacestorage.SpaceStorageProvider numParallel int + rootPath string } -func NewSpaceMigrator(oldProvider oldstorage.SpaceStorageProvider, newProvider spacestorage.SpaceStorageProvider, numParallel int) SpaceMigrator { +func NewSpaceMigrator(oldProvider oldstorage.SpaceStorageProvider, newProvider spacestorage.SpaceStorageProvider, numParallel int, rootPath string) SpaceMigrator { return &spaceMigrator{ oldProvider: oldProvider, newProvider: newProvider, numParallel: numParallel, + rootPath: rootPath, } } func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error) { + migrated, storage := s.checkMigrated(ctx, id) + if migrated { + storage.Close(ctx) + return nil, ErrAlreadyMigrated + } + if storage != nil { + err := storage.Close(ctx) + if err != nil { + return nil, fmt.Errorf("migration: failed to close old storage: %w", err) + } + os.RemoveAll(filepath.Join(s.rootPath, id)) + } oldStorage, err := s.oldProvider.WaitSpaceStorage(ctx, id) if err != nil { return nil, fmt.Errorf("migration: failed to get old space storage: %w", err) @@ -69,7 +96,7 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progr if err != nil { return nil, fmt.Errorf("migration: failed to migrate acl list: %w", err) } - executor := newMigratePool(s.numParallel, 0) + executor := newMigratePool(ctx, s.numParallel, 0) storedIds, err := oldStorage.StoredIds() if err != nil { return nil, fmt.Errorf("migration: failed to get stored ids: %w", err) @@ -104,9 +131,53 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progr } } executor.Run() - executor.Close() + err = executor.Wait() + if err != nil { + return nil, fmt.Errorf("migration: failed to wait for executor: %w", err) + } if len(allErrors) > 0 { return nil, fmt.Errorf("migration failed: %w", errors.Join(allErrors...)) } - return newStorage, nil + return newStorage, s.setMigrated(ctx, newStorage.AnyStore()) +} + +func (s *spaceMigrator) checkMigrated(ctx context.Context, id string) (bool, spacestorage.SpaceStorage) { + storage, err := s.newProvider.WaitSpaceStorage(ctx, id) + if err != nil { + return false, nil + } + coll, err := storage.AnyStore().OpenCollection(ctx, migratedColl) + if err != nil { + return false, storage + } + _, err = coll.FindId(ctx, migratedDoc) + if err != nil { + return false, storage + } + return true, storage +} + +func (s *spaceMigrator) setMigrated(ctx context.Context, anyStore anystore.DB) error { + coll, err := anyStore.Collection(ctx, migratedColl) + if err != nil { + return fmt.Errorf("migration: failed to get collection: %w", err) + } + arena := &anyenc.Arena{} + tx, err := coll.WriteTx(ctx) + if err != nil { + return err + } + newVal := arena.NewObject() + newVal.Set(migratedTimeKey, arena.NewNumberFloat64(float64(time.Now().Unix()))) + newVal.Set("id", arena.NewString(migratedDoc)) + err = coll.Insert(tx.Context(), newVal) + if err != nil { + tx.Rollback() + return err + } + err = tx.Commit() + if err != nil { + return nil + } + return anyStore.Checkpoint(ctx, true) }