1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 09:35:03 +09:00

Add already migrated logic

This commit is contained in:
mcrakhman 2025-01-08 18:33:13 +01:00
parent 83738c231f
commit b97ff65643
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
4 changed files with 124 additions and 23 deletions

View file

@ -0,0 +1,39 @@
package migration
import (
"context"
"sync"
)
type contextWaitGroup struct {
ctx context.Context
wg sync.WaitGroup
}
func newContextWaitGroup(ctx context.Context) *contextWaitGroup {
return &contextWaitGroup{
ctx: ctx,
}
}
func (cwg *contextWaitGroup) Add(delta int) {
cwg.wg.Add(delta)
}
func (cwg *contextWaitGroup) Done() {
cwg.wg.Done()
}
func (cwg *contextWaitGroup) Wait() error {
doneCh := make(chan struct{})
go func() {
cwg.wg.Wait()
close(doneCh)
}()
select {
case <-doneCh:
return nil
case <-cwg.ctx.Done():
return cwg.ctx.Err()
}
}

View file

@ -2,7 +2,6 @@ package migration
import (
"context"
"sync"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
@ -12,9 +11,11 @@ import (
var log = logger.NewNamed("common.spacestorage.migration")
func newMigratePool(workers, maxSize int) *migratePool {
func newMigratePool(ctx context.Context, workers, maxSize int) *migratePool {
ss := &migratePool{
workers: workers,
ctx: ctx,
wg: newContextWaitGroup(ctx),
batch: mb.New[func()](maxSize),
}
return ss
@ -24,7 +25,8 @@ type migratePool struct {
workers int
batch *mb.MB[func()]
close chan struct{}
wg sync.WaitGroup
ctx context.Context
wg *contextWaitGroup
}
func (mp *migratePool) Add(ctx context.Context, f ...func()) (err error) {
@ -51,7 +53,7 @@ func (mp *migratePool) Run() {
func (mp *migratePool) sendLoop() {
for {
f, err := mp.batch.WaitOne(context.Background())
f, err := mp.batch.WaitOne(mp.ctx)
if err != nil {
log.Debug("close send loop", zap.Error(err))
return
@ -61,7 +63,8 @@ func (mp *migratePool) sendLoop() {
}
}
func (mp *migratePool) Close() (err error) {
mp.wg.Wait()
return mp.batch.Close()
func (mp *migratePool) Wait() (err error) {
err = mp.wg.Wait()
mp.batch.Close()
return err
}

View file

@ -4,7 +4,12 @@ import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"time"
anystore "github.com/anyproto/any-store"
"github.com/anyproto/any-store/anyenc"
"go.uber.org/zap"
"github.com/anyproto/any-sync/commonspace/spacestorage"
@ -12,6 +17,14 @@ import (
"github.com/anyproto/any-sync/util/crypto"
)
const (
migratedColl = "migration"
migratedDoc = "state"
migratedTimeKey = "t"
)
var ErrAlreadyMigrated = errors.New("already migrated")
type SpaceMigrator interface {
MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error)
}
@ -24,17 +37,31 @@ type spaceMigrator struct {
oldProvider oldstorage.SpaceStorageProvider
newProvider spacestorage.SpaceStorageProvider
numParallel int
rootPath string
}
func NewSpaceMigrator(oldProvider oldstorage.SpaceStorageProvider, newProvider spacestorage.SpaceStorageProvider, numParallel int) SpaceMigrator {
func NewSpaceMigrator(oldProvider oldstorage.SpaceStorageProvider, newProvider spacestorage.SpaceStorageProvider, numParallel int, rootPath string) SpaceMigrator {
return &spaceMigrator{
oldProvider: oldProvider,
newProvider: newProvider,
numParallel: numParallel,
rootPath: rootPath,
}
}
func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error) {
migrated, storage := s.checkMigrated(ctx, id)
if migrated {
storage.Close(ctx)
return nil, ErrAlreadyMigrated
}
if storage != nil {
err := storage.Close(ctx)
if err != nil {
return nil, fmt.Errorf("migration: failed to close old storage: %w", err)
}
os.RemoveAll(filepath.Join(s.rootPath, id))
}
oldStorage, err := s.oldProvider.WaitSpaceStorage(ctx, id)
if err != nil {
return nil, fmt.Errorf("migration: failed to get old space storage: %w", err)
@ -69,7 +96,7 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progr
if err != nil {
return nil, fmt.Errorf("migration: failed to migrate acl list: %w", err)
}
executor := newMigratePool(s.numParallel, 0)
executor := newMigratePool(ctx, s.numParallel, 0)
storedIds, err := oldStorage.StoredIds()
if err != nil {
return nil, fmt.Errorf("migration: failed to get stored ids: %w", err)
@ -104,9 +131,53 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progr
}
}
executor.Run()
executor.Close()
err = executor.Wait()
if err != nil {
return nil, fmt.Errorf("migration: failed to wait for executor: %w", err)
}
if len(allErrors) > 0 {
return nil, fmt.Errorf("migration failed: %w", errors.Join(allErrors...))
}
return newStorage, nil
return newStorage, s.setMigrated(ctx, newStorage.AnyStore())
}
func (s *spaceMigrator) checkMigrated(ctx context.Context, id string) (bool, spacestorage.SpaceStorage) {
storage, err := s.newProvider.WaitSpaceStorage(ctx, id)
if err != nil {
return false, nil
}
coll, err := storage.AnyStore().OpenCollection(ctx, migratedColl)
if err != nil {
return false, storage
}
_, err = coll.FindId(ctx, migratedDoc)
if err != nil {
return false, storage
}
return true, storage
}
func (s *spaceMigrator) setMigrated(ctx context.Context, anyStore anystore.DB) error {
coll, err := anyStore.Collection(ctx, migratedColl)
if err != nil {
return fmt.Errorf("migration: failed to get collection: %w", err)
}
arena := &anyenc.Arena{}
tx, err := coll.WriteTx(ctx)
if err != nil {
return err
}
newVal := arena.NewObject()
newVal.Set(migratedTimeKey, arena.NewNumberFloat64(float64(time.Now().Unix())))
newVal.Set("id", arena.NewString(migratedDoc))
err = coll.Insert(tx.Context(), newVal)
if err != nil {
tx.Rollback()
return err
}
err = tx.Commit()
if err != nil {
return nil
}
return anyStore.Checkpoint(ctx, true)
}