1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-09 17:44:59 +09:00

Start sync later with treesyncer

This commit is contained in:
mcrakhman 2023-05-26 14:11:38 +02:00
parent 8ea7ccaad0
commit 760aecd232
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
8 changed files with 308 additions and 17 deletions

View file

@ -5,6 +5,7 @@ import (
"crypto/rand"
"errors"
"fmt"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"time"
"github.com/anyproto/any-sync/app/ocache"
@ -109,6 +110,28 @@ func (s *Service) GetTree(ctx context.Context, spaceId, id string) (tr objecttre
return sb.(source.ObjectTreeProvider).Tree(), nil
}
func (s *Service) NewTreeSyncer(spaceId string) treemanager.TreeSyncer {
s.syncerLock.Lock()
defer s.syncerLock.Unlock()
if s.syncer != nil {
s.syncer.Close()
s.syncer = newTreeSyncer(spaceId, time.Second, 10, s)
}
if s.syncStarted {
s.syncer.Run()
}
return s.syncer
}
func (s *Service) StartSync() {
s.syncerLock.Lock()
defer s.syncerLock.Unlock()
s.syncStarted = true
if s.syncer != nil {
s.syncer.Run()
}
}
func (s *Service) GetObject(ctx context.Context, spaceId, id string) (sb smartblock.SmartBlock, err error) {
ctx = updateCacheOpts(ctx, func(opts cacheOpts) cacheOpts {
opts.spaceId = spaceId

View file

@ -6,6 +6,7 @@ import (
"fmt"
"go.uber.org/zap"
"strings"
"sync"
"time"
"github.com/anyproto/any-sync/accountservice"
@ -148,6 +149,10 @@ type Service struct {
fileSync filesync.FileSync
fileService files.Service
// TODO: move all this into separate treecache component or something like this
syncer *treeSyncer
syncStarted bool
syncerLock sync.Mutex
}
func (s *Service) Name() string {

172
core/block/treesyncer.go Normal file
View file

@ -0,0 +1,172 @@
package block
import (
"context"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/streampool"
"go.uber.org/zap"
"sync"
"time"
)
type executor struct {
pool *streampool.ExecPool
objs map[string]struct{}
sync.Mutex
}
func newExecutor(workers, size int) *executor {
return &executor{
pool: streampool.NewExecPool(workers, size),
objs: map[string]struct{}{},
}
}
func (e *executor) tryAdd(id string, action func()) (err error) {
if _, exists := e.objs[id]; exists {
return nil
}
e.Lock()
defer e.Unlock()
e.objs[id] = struct{}{}
return e.pool.TryAdd(func() {
action()
e.Lock()
defer e.Unlock()
delete(e.objs, id)
})
}
func (e *executor) run() {
e.pool.Run()
}
func (e *executor) close() {
e.pool.Close()
}
type treeSyncer struct {
sync.Mutex
requests int
spaceId string
timeout time.Duration
requestPools map[string]*executor
headPools map[string]*executor
treeManager treemanager.TreeManager
isRunning bool
}
func newTreeSyncer(spaceId string, timeout time.Duration, concurrentReqs int, treeManager treemanager.TreeManager) *treeSyncer {
return &treeSyncer{
requests: concurrentReqs,
spaceId: spaceId,
timeout: timeout,
requestPools: map[string]*executor{},
headPools: map[string]*executor{},
treeManager: treeManager,
}
}
func (t *treeSyncer) Init() {
}
func (t *treeSyncer) Run() {
t.Lock()
defer t.Unlock()
t.isRunning = true
for _, p := range t.requestPools {
p.run()
}
for _, p := range t.headPools {
p.run()
}
}
func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error {
t.Lock()
defer t.Unlock()
reqExec, exists := t.requestPools[peerId]
if !exists {
reqExec = newExecutor(t.requests, 0)
if t.isRunning {
reqExec.run()
}
t.requestPools[peerId] = reqExec
}
headExec, exists := t.headPools[peerId]
if !exists {
headExec = newExecutor(1, 0)
if t.isRunning {
headExec.run()
}
t.headPools[peerId] = headExec
}
for _, id := range existing {
idCopy := id
err := headExec.tryAdd(idCopy, func() {
t.updateTree(peerId, idCopy)
})
if err != nil {
log.Error("failed to add to head queue", zap.Error(err))
}
}
for _, id := range missing {
idCopy := id
err := reqExec.tryAdd(idCopy, func() {
t.requestTree(peerId, idCopy)
})
if err != nil {
log.Error("failed to add to request queue", zap.Error(err))
}
}
return nil
}
func (t *treeSyncer) requestTree(peerId, id string) {
log := log.With(zap.String("treeId", id))
ctx := peer.CtxWithPeerId(context.Background(), peerId)
ctx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
_, err := t.treeManager.GetTree(ctx, t.spaceId, id)
if err != nil {
log.Warn(ctx, "can't load missing tree", zap.Error(err))
} else {
log.Debug(ctx, "loaded missing tree")
}
}
func (t *treeSyncer) updateTree(peerId, id string) {
log := log.With(zap.String("treeId", id))
ctx := peer.CtxWithPeerId(context.Background(), peerId)
ctx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
tr, err := t.treeManager.GetTree(ctx, t.spaceId, id)
if err != nil {
log.Warn("can't load existing tree", zap.Error(err))
return
}
syncTree, ok := tr.(synctree.SyncTree)
if !ok {
log.Warn("not a sync tree")
}
if err = syncTree.SyncWithPeer(ctx, peerId); err != nil {
log.Warn("synctree.SyncWithPeer error", zap.Error(err))
} else {
log.Debug("success synctree.SyncWithPeer")
}
}
func (t *treeSyncer) Close() error {
t.Lock()
defer t.Unlock()
t.isRunning = false
for _, pool := range t.headPools {
pool.close()
}
for _, pool := range t.requestPools {
pool.close()
}
return nil
}

View file

@ -0,0 +1,98 @@
package block
import (
"context"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestTreeSyncer(t *testing.T) {
ctrl := gomock.NewController(t)
managerMock := mock_treemanager.NewMockTreeManager(ctrl)
spaceId := "spaceId"
peerId := "peerId"
existingId := "existing"
missingId := "missing"
missingMock := mock_objecttree.NewMockObjectTree(ctrl)
existingMock := mock_synctree.NewMockSyncTree(ctrl)
t.Run("delayed sync", func(t *testing.T) {
syncer := newTreeSyncer(spaceId, objectLoadTimeout, 10, managerMock)
syncer.Init()
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(existingMock, nil)
existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId).Return(missingMock, nil)
err := syncer.SyncAll(context.Background(), peerId, []string{existingId}, []string{missingId})
require.NoError(t, err)
require.NotNil(t, syncer.requestPools[peerId])
require.NotNil(t, syncer.headPools[peerId])
syncer.Run()
time.Sleep(100 * time.Millisecond)
syncer.Close()
})
t.Run("sync after run", func(t *testing.T) {
syncer := newTreeSyncer(spaceId, objectLoadTimeout, 10, managerMock)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(existingMock, nil)
existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId).Return(missingMock, nil)
syncer.Init()
syncer.Run()
err := syncer.SyncAll(context.Background(), peerId, []string{existingId}, []string{missingId})
require.NoError(t, err)
require.NotNil(t, syncer.requestPools[peerId])
require.NotNil(t, syncer.headPools[peerId])
time.Sleep(100 * time.Millisecond)
syncer.Close()
})
t.Run("sync same ids", func(t *testing.T) {
syncer := newTreeSyncer(spaceId, objectLoadTimeout, 10, managerMock)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(existingMock, nil)
existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId).Return(missingMock, nil)
syncer.Init()
syncer.Run()
err := syncer.SyncAll(context.Background(), peerId, []string{existingId, existingId}, []string{missingId, missingId, missingId})
require.NoError(t, err)
require.NotNil(t, syncer.requestPools[peerId])
require.NotNil(t, syncer.headPools[peerId])
time.Sleep(100 * time.Millisecond)
syncer.Close()
})
t.Run("sync concurrent ids", func(t *testing.T) {
ch := make(chan struct{}, 2)
syncer := newTreeSyncer(spaceId, objectLoadTimeout, 2, managerMock)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(existingMock, nil)
existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId+"1").DoAndReturn(func(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {
<-ch
return missingMock, nil
})
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId+"2").DoAndReturn(func(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {
<-ch
return missingMock, nil
})
syncer.Init()
syncer.Run()
err := syncer.SyncAll(context.Background(), peerId, []string{existingId}, []string{missingId + "1", missingId + "2"})
require.NoError(t, err)
require.NotNil(t, syncer.requestPools[peerId])
require.NotNil(t, syncer.headPools[peerId])
time.Sleep(100 * time.Millisecond)
syncer.Close()
for i := 0; i < 2; i++ {
ch <- struct{}{}
}
})
}

View file

@ -91,6 +91,10 @@ type subObjectCreator interface {
CreateSubObjectsInWorkspace(details []*types.Struct) (ids []string, objects []*types.Struct, err error)
}
type syncStarter interface {
StartSync()
}
type indexer struct {
store objectstore.ObjectStore
fileStore filestore.FileStore
@ -99,6 +103,7 @@ type indexer struct {
picker block.Picker
ftsearch ftsearch.FTSearch
subObjectCreator subObjectCreator
syncStarter syncStarter
fileService files.Service
quit chan struct{}
@ -123,6 +128,7 @@ func (i *indexer) Init(a *app.App) (err error) {
i.fileStore = app.MustComponent[filestore.FileStore](a)
i.ftsearch = app.MustComponent[ftsearch.FTSearch](a)
i.subObjectCreator = app.MustComponent[subObjectCreator](a)
i.syncStarter = app.MustComponent[syncStarter](a)
i.quit = make(chan struct{})
i.archivedMap = make(map[string]struct{}, 100)
i.favoriteMap = make(map[string]struct{}, 100)
@ -359,10 +365,7 @@ func (i *indexer) reindex(ctx context.Context, flags reindexFlags) (err error) {
}
// starting sync of all other objects later, because we don't want to have problems with loading of derived objects
// due to parallel load which can overload the stream
err = i.spaceService.StartSync(ctx)
if err != nil {
return err
}
i.syncStarter.StartSync()
if flags.any() {
d, err := i.getObjectInfo(ctx, i.anytype.PredefinedBlocks().Archive)

2
go.mod
View file

@ -7,7 +7,7 @@ require (
github.com/PuerkitoBio/goquery v1.8.1
github.com/VividCortex/ewma v1.2.0
github.com/adrium/goheif v0.0.0-20230113233934-ca402e77a786
github.com/anyproto/any-sync v0.1.2-0.20230524194759-a5e2bea04c2b
github.com/anyproto/any-sync v0.1.2-0.20230526092921-556f03ed7fa4
github.com/anyproto/go-naturaldate/v2 v2.0.2-0.20230524105841-9829cfd13438
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/blevesearch/bleve/v2 v2.3.6

4
go.sum
View file

@ -44,8 +44,8 @@ github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxB
github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/anyproto/any-sync v0.1.2-0.20230524194759-a5e2bea04c2b h1:qpD1wy54HS9G46z3R3TX4xjEVhNU0+jgfEKgxjbmwcM=
github.com/anyproto/any-sync v0.1.2-0.20230524194759-a5e2bea04c2b/go.mod h1:N56AZy/MmqabgNPWc4+Ta6Tbkk0Ob7sYLfNpzK4Hv7M=
github.com/anyproto/any-sync v0.1.2-0.20230526092921-556f03ed7fa4 h1:PG1ti/Wg5W9/vJ3zsxVkxeXH5bx8uFITxYa6npniRtw=
github.com/anyproto/any-sync v0.1.2-0.20230526092921-556f03ed7fa4/go.mod h1:N56AZy/MmqabgNPWc4+Ta6Tbkk0Ob7sYLfNpzK4Hv7M=
github.com/anyproto/go-chash v0.1.0 h1:I9meTPjXFRfXZHRJzjOHC/XF7Q5vzysKkiT/grsogXY=
github.com/anyproto/go-chash v0.1.0/go.mod h1:0UjNQi3PDazP0fINpFYu6VKhuna+W/V+1vpXHAfNgLY=
github.com/anyproto/go-ds-badger3 v0.3.1-0.20230524095230-434cf6346d9b h1:SMizb43hfILk2bpMgpTd30n6yQQdxW0ZbDti0wqfsBw=

View file

@ -61,7 +61,6 @@ type Service interface {
DeleteSpace(ctx context.Context, spaceID string, revert bool) (payload StatusPayload, err error)
DeleteAccount(ctx context.Context, revert bool) (payload StatusPayload, err error)
StreamPool() streampool.StreamPool
StartSync(ctx context.Context) (err error)
app.ComponentRunnable
}
@ -148,15 +147,6 @@ func (s *service) Run(ctx context.Context) (err error) {
return
}
func (s *service) StartSync(ctx context.Context) (err error) {
sp, err := s.AccountSpace(ctx)
if err != nil {
return
}
sp.HeadSync().Run()
return
}
func (s *service) DeriveSpace(ctx context.Context, payload commonspace.SpaceDerivePayload) (container commonspace.Space, err error) {
id, err := s.commonSpace.DeriveSpace(ctx, payload)
if err != nil {