From 488823d55c8879ea0056afd3285113b5edcf6316 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 8 Jan 2025 13:32:40 +0100 Subject: [PATCH] Acl fixes and migration pool --- .../spacestorage/migration/aclmigrator.go | 10 ++- .../spacestorage/migration/executor.go | 67 +++++++++++++++++++ .../spacestorage/migration/spacemigrator.go | 12 ++-- 3 files changed, 84 insertions(+), 5 deletions(-) create mode 100644 commonspace/spacestorage/migration/executor.go diff --git a/commonspace/spacestorage/migration/aclmigrator.go b/commonspace/spacestorage/migration/aclmigrator.go index 1ac1b46b..2c04f1e8 100644 --- a/commonspace/spacestorage/migration/aclmigrator.go +++ b/commonspace/spacestorage/migration/aclmigrator.go @@ -3,6 +3,7 @@ package migration import ( "context" "fmt" + "slices" anystore "github.com/anyproto/any-store" @@ -22,7 +23,7 @@ func migrateAclList(ctx context.Context, oldStorage oldstorage.ListStorage, head if err != nil { return nil, fmt.Errorf("migration: failed to get acl head: %w", err) } - aclStorage, err := list.CreateStorage(ctx, rootChange, headStorage, store) + aclStorage, err := list.NewStorage(ctx, rootChange.Id, headStorage, store) if err != nil { return nil, fmt.Errorf("migration: failed to create acl storage: %w", err) } @@ -38,6 +39,7 @@ func migrateAclList(ctx context.Context, oldStorage oldstorage.ListStorage, head allRecords []*consensusproto.RawRecordWithId rec *consensusproto.RawRecordWithId cur = head + builder = aclList.RecordBuilder() ) for rec == nil || rec.Id != rootChange.Id { rec, err = oldStorage.GetRawRecord(ctx, cur) @@ -45,7 +47,13 @@ func migrateAclList(ctx context.Context, oldStorage oldstorage.ListStorage, head return nil, fmt.Errorf("migration: failed to get acl record: %w", err) } allRecords = append(allRecords, rec) + res, err := builder.UnmarshallWithId(rec) + if err != nil { + return nil, fmt.Errorf("migration: failed to unmarshall acl record: %w", err) + } + cur = res.PrevId } + slices.Reverse(allRecords) err = aclList.AddRawRecords(allRecords) if err != nil { return nil, fmt.Errorf("migration: failed to add acl records: %w", err) diff --git a/commonspace/spacestorage/migration/executor.go b/commonspace/spacestorage/migration/executor.go new file mode 100644 index 00000000..498e2b4c --- /dev/null +++ b/commonspace/spacestorage/migration/executor.go @@ -0,0 +1,67 @@ +package migration + +import ( + "context" + "sync" + + "github.com/cheggaaa/mb/v3" + "go.uber.org/zap" + + "github.com/anyproto/any-sync/app/logger" +) + +var log = logger.NewNamed("common.spacestorage.migration") + +func newMigratePool(workers, maxSize int) *migratePool { + ss := &migratePool{ + workers: workers, + batch: mb.New[func()](maxSize), + } + return ss +} + +type migratePool struct { + workers int + batch *mb.MB[func()] + close chan struct{} + wg sync.WaitGroup +} + +func (mp *migratePool) Add(ctx context.Context, f ...func()) (err error) { + err = mp.batch.Add(ctx, f...) + if err == nil { + mp.wg.Add(1) + } + return err +} + +func (mp *migratePool) TryAdd(f ...func()) (err error) { + err = mp.batch.TryAdd(f...) + if err == nil { + mp.wg.Add(1) + } + return err +} + +func (mp *migratePool) Run() { + for i := 0; i < mp.workers; i++ { + go mp.sendLoop() + } +} + +func (mp *migratePool) sendLoop() { + for { + f, err := mp.batch.WaitOne(context.Background()) + mp.wg.Done() + if err != nil { + log.Debug("close send loop", zap.Error(err)) + return + } + f() + } +} + +func (mp *migratePool) Close() (err error) { + mp.wg.Wait() + return mp.batch.Close() +} diff --git a/commonspace/spacestorage/migration/spacemigrator.go b/commonspace/spacestorage/migration/spacemigrator.go index 36a3b60d..f1d46909 100644 --- a/commonspace/spacestorage/migration/spacemigrator.go +++ b/commonspace/spacestorage/migration/spacemigrator.go @@ -7,12 +7,15 @@ import ( "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage" - "github.com/anyproto/any-sync/net/streampool" "github.com/anyproto/any-sync/util/crypto" ) type SpaceMigrator interface { - MigrateId(ctx context.Context, id string) (spacestorage.SpaceStorage, error) + MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error) +} + +type Progress interface { + AddDone(done int64) } type spaceMigrator struct { @@ -29,7 +32,7 @@ func NewSpaceMigrator(oldProvider oldstorage.SpaceStorageProvider, newProvider s } } -func (s *spaceMigrator) MigrateId(ctx context.Context, id string) (spacestorage.SpaceStorage, error) { +func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error) { oldStorage, err := s.oldProvider.WaitSpaceStorage(ctx, id) if err != nil { return nil, fmt.Errorf("migration: failed to get old space storage: %w", err) @@ -64,7 +67,7 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string) (spacestorage. if err != nil { return nil, fmt.Errorf("migration: failed to migrate acl list: %w", err) } - executor := streampool.NewExecPool(s.numParallel, 0) + executor := newMigratePool(s.numParallel, 0) storedIds, err := oldStorage.StoredIds() if err != nil { return nil, fmt.Errorf("migration: failed to get stored ids: %w", err) @@ -80,6 +83,7 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string) (spacestorage. err := executor.Add(ctx, func() { tm := <-ch defer func() { + progress.AddDone(1) ch <- tm }() treeStorage, err := oldStorage.TreeStorage(id)