From a7a2d1fb24bd1be8a8dd8bd3c66f6ad84c05b040 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 19 Feb 2025 18:31:38 +0100 Subject: [PATCH] Some testing --- commonspace/object/tree/objecttree/storage.go | 72 ++++++++++++++++--- 1 file changed, 63 insertions(+), 9 deletions(-) diff --git a/commonspace/object/tree/objecttree/storage.go b/commonspace/object/tree/objecttree/storage.go index cfc1ff06..5ba54bc2 100644 --- a/commonspace/object/tree/objecttree/storage.go +++ b/commonspace/object/tree/objecttree/storage.go @@ -4,11 +4,13 @@ import ( "context" "errors" "fmt" + "sync" "time" anystore "github.com/anyproto/any-store" "github.com/anyproto/any-store/anyenc" "github.com/anyproto/any-store/query" + "go.uber.org/atomic" "github.com/anyproto/any-sync/commonspace/headsync/headstorage" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" @@ -74,6 +76,34 @@ type storage struct { root StorageChange } +type Runner struct { + sem chan struct{} +} + +func NewRunner(max int) *Runner { + return &Runner{ + sem: make(chan struct{}, max), + } +} + +var ( + lastCheckpoint = atomic.Time{} +) + +func (r *Runner) Run(action func(), store anystore.DB) { + r.sem <- struct{}{} + defer func() { + <-r.sem + }() + + action() +} + +var ( + runner = NewRunner(1000) + lock = sync.Mutex{} +) + var storageChangeBuilder = NewChangeBuilder func CreateStorage(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, headStorage headstorage.HeadStorage, store anystore.DB) (Storage, error) { @@ -216,7 +246,7 @@ func (s *storage) GetAfterOrder(ctx context.Context, orderId string, storageIter return nil } -func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error { +func (s *storage) addAllInner(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error { arena := s.arena defer arena.Reset() tx, err := s.store.WriteTx(ctx) @@ -232,16 +262,32 @@ func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []s tx.Rollback() return err } - if i%20 == 0 && i > 0 { - err = tx.Commit() - if err != nil { - return err - } - tx, err = s.store.WriteTx(ctx) - if err != nil { - return fmt.Errorf("failed to create write tx: %w", err) + if i%10 == 0 || i == len(changes)-1 { + //err = tx.Commit() + //if err != nil { + // return err + //} + //tx, err = s.store.WriteTx(ctx) + //if err != nil { + // return fmt.Errorf("failed to create write tx: %w", err) + //} + now := time.Now() + checkpoint := lastCheckpoint.Load() + + if now.Sub(checkpoint) > time.Second { + lock.Lock() + checkpoint := lastCheckpoint.Load() + now = time.Now() + if now.Sub(checkpoint) > time.Second { + s.store.Checkpoint(context.Background(), false) + lastCheckpoint.Store(time.Now()) + } + lock.Unlock() } } + //if totalOps.Add(1)%100 == 0 { + // s.store.Checkpoint(ctx, false) + //} } update := headstorage.HeadsUpdate{ Id: s.id, @@ -256,6 +302,14 @@ func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []s return tx.Commit() } +func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error { + var err error + runner.Run(func() { + err = s.addAllInner(ctx, changes, heads, commonSnapshot) + }, s.store) + return err +} + func (s *storage) AddAllNoError(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error { arena := s.arena defer arena.Reset()