1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 09:35:03 +09:00

Acl fixes and migration pool

This commit is contained in:
mcrakhman 2025-01-08 13:32:40 +01:00
parent 8cb2e54eef
commit 488823d55c
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
3 changed files with 84 additions and 5 deletions

View file

@ -3,6 +3,7 @@ package migration
import (
"context"
"fmt"
"slices"
anystore "github.com/anyproto/any-store"
@ -22,7 +23,7 @@ func migrateAclList(ctx context.Context, oldStorage oldstorage.ListStorage, head
if err != nil {
return nil, fmt.Errorf("migration: failed to get acl head: %w", err)
}
aclStorage, err := list.CreateStorage(ctx, rootChange, headStorage, store)
aclStorage, err := list.NewStorage(ctx, rootChange.Id, headStorage, store)
if err != nil {
return nil, fmt.Errorf("migration: failed to create acl storage: %w", err)
}
@ -38,6 +39,7 @@ func migrateAclList(ctx context.Context, oldStorage oldstorage.ListStorage, head
allRecords []*consensusproto.RawRecordWithId
rec *consensusproto.RawRecordWithId
cur = head
builder = aclList.RecordBuilder()
)
for rec == nil || rec.Id != rootChange.Id {
rec, err = oldStorage.GetRawRecord(ctx, cur)
@ -45,7 +47,13 @@ func migrateAclList(ctx context.Context, oldStorage oldstorage.ListStorage, head
return nil, fmt.Errorf("migration: failed to get acl record: %w", err)
}
allRecords = append(allRecords, rec)
res, err := builder.UnmarshallWithId(rec)
if err != nil {
return nil, fmt.Errorf("migration: failed to unmarshall acl record: %w", err)
}
cur = res.PrevId
}
slices.Reverse(allRecords)
err = aclList.AddRawRecords(allRecords)
if err != nil {
return nil, fmt.Errorf("migration: failed to add acl records: %w", err)

View file

@ -0,0 +1,67 @@
package migration
import (
"context"
"sync"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
"github.com/anyproto/any-sync/app/logger"
)
var log = logger.NewNamed("common.spacestorage.migration")
func newMigratePool(workers, maxSize int) *migratePool {
ss := &migratePool{
workers: workers,
batch: mb.New[func()](maxSize),
}
return ss
}
type migratePool struct {
workers int
batch *mb.MB[func()]
close chan struct{}
wg sync.WaitGroup
}
func (mp *migratePool) Add(ctx context.Context, f ...func()) (err error) {
err = mp.batch.Add(ctx, f...)
if err == nil {
mp.wg.Add(1)
}
return err
}
func (mp *migratePool) TryAdd(f ...func()) (err error) {
err = mp.batch.TryAdd(f...)
if err == nil {
mp.wg.Add(1)
}
return err
}
func (mp *migratePool) Run() {
for i := 0; i < mp.workers; i++ {
go mp.sendLoop()
}
}
func (mp *migratePool) sendLoop() {
for {
f, err := mp.batch.WaitOne(context.Background())
mp.wg.Done()
if err != nil {
log.Debug("close send loop", zap.Error(err))
return
}
f()
}
}
func (mp *migratePool) Close() (err error) {
mp.wg.Wait()
return mp.batch.Close()
}

View file

@ -7,12 +7,15 @@ import (
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacestorage/oldstorage"
"github.com/anyproto/any-sync/net/streampool"
"github.com/anyproto/any-sync/util/crypto"
)
type SpaceMigrator interface {
MigrateId(ctx context.Context, id string) (spacestorage.SpaceStorage, error)
MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error)
}
type Progress interface {
AddDone(done int64)
}
type spaceMigrator struct {
@ -29,7 +32,7 @@ func NewSpaceMigrator(oldProvider oldstorage.SpaceStorageProvider, newProvider s
}
}
func (s *spaceMigrator) MigrateId(ctx context.Context, id string) (spacestorage.SpaceStorage, error) {
func (s *spaceMigrator) MigrateId(ctx context.Context, id string, progress Progress) (spacestorage.SpaceStorage, error) {
oldStorage, err := s.oldProvider.WaitSpaceStorage(ctx, id)
if err != nil {
return nil, fmt.Errorf("migration: failed to get old space storage: %w", err)
@ -64,7 +67,7 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string) (spacestorage.
if err != nil {
return nil, fmt.Errorf("migration: failed to migrate acl list: %w", err)
}
executor := streampool.NewExecPool(s.numParallel, 0)
executor := newMigratePool(s.numParallel, 0)
storedIds, err := oldStorage.StoredIds()
if err != nil {
return nil, fmt.Errorf("migration: failed to get stored ids: %w", err)
@ -80,6 +83,7 @@ func (s *spaceMigrator) MigrateId(ctx context.Context, id string) (spacestorage.
err := executor.Add(ctx, func() {
tm := <-ch
defer func() {
progress.AddDone(1)
ch <- tm
}()
treeStorage, err := oldStorage.TreeStorage(id)