1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-07 21:47:02 +09:00

Some testing

This commit is contained in:
mcrakhman 2025-02-19 18:31:38 +01:00
parent 40998931f1
commit a7a2d1fb24
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B

View file

@ -4,11 +4,13 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
anystore "github.com/anyproto/any-store"
"github.com/anyproto/any-store/anyenc"
"github.com/anyproto/any-store/query"
"go.uber.org/atomic"
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
@ -74,6 +76,34 @@ type storage struct {
root StorageChange
}
type Runner struct {
sem chan struct{}
}
func NewRunner(max int) *Runner {
return &Runner{
sem: make(chan struct{}, max),
}
}
var (
lastCheckpoint = atomic.Time{}
)
func (r *Runner) Run(action func(), store anystore.DB) {
r.sem <- struct{}{}
defer func() {
<-r.sem
}()
action()
}
var (
runner = NewRunner(1000)
lock = sync.Mutex{}
)
var storageChangeBuilder = NewChangeBuilder
func CreateStorage(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, headStorage headstorage.HeadStorage, store anystore.DB) (Storage, error) {
@ -216,7 +246,7 @@ func (s *storage) GetAfterOrder(ctx context.Context, orderId string, storageIter
return nil
}
func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error {
func (s *storage) addAllInner(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error {
arena := s.arena
defer arena.Reset()
tx, err := s.store.WriteTx(ctx)
@ -232,16 +262,32 @@ func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []s
tx.Rollback()
return err
}
if i%20 == 0 && i > 0 {
err = tx.Commit()
if err != nil {
return err
}
tx, err = s.store.WriteTx(ctx)
if err != nil {
return fmt.Errorf("failed to create write tx: %w", err)
if i%10 == 0 || i == len(changes)-1 {
//err = tx.Commit()
//if err != nil {
// return err
//}
//tx, err = s.store.WriteTx(ctx)
//if err != nil {
// return fmt.Errorf("failed to create write tx: %w", err)
//}
now := time.Now()
checkpoint := lastCheckpoint.Load()
if now.Sub(checkpoint) > time.Second {
lock.Lock()
checkpoint := lastCheckpoint.Load()
now = time.Now()
if now.Sub(checkpoint) > time.Second {
s.store.Checkpoint(context.Background(), false)
lastCheckpoint.Store(time.Now())
}
lock.Unlock()
}
}
//if totalOps.Add(1)%100 == 0 {
// s.store.Checkpoint(ctx, false)
//}
}
update := headstorage.HeadsUpdate{
Id: s.id,
@ -256,6 +302,14 @@ func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []s
return tx.Commit()
}
func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error {
var err error
runner.Run(func() {
err = s.addAllInner(ctx, changes, heads, commonSnapshot)
}, s.store)
return err
}
func (s *storage) AddAllNoError(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error {
arena := s.arena
defer arena.Reset()