1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-10 18:10:49 +09:00

Merge pull request #13 from anyproto/GO-1313-fix-account-load

GO-1313: Update to latest any-sync with delayed load
This commit is contained in:
Mikhail Rakhmanov 2023-05-30 14:09:20 +02:00 committed by GitHub
commit a873268f9a
Signed by: github
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 380 additions and 35 deletions

View file

@ -12,6 +12,7 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/util/crypto"
"go.uber.org/zap"
@ -30,7 +31,10 @@ type ctxKey int
var errAppIsNotRunning = errors.New("app is not running")
const (
optsKey ctxKey = iota
optsKey ctxKey = iota
derivedObjectLoadTimeout = time.Minute * 30
objectLoadTimeout = time.Minute * 3
concurrentTrees = 10
)
type treeCreateCache struct {
@ -107,6 +111,26 @@ func (s *Service) GetTree(ctx context.Context, spaceId, id string) (tr objecttre
return sb.(source.ObjectTreeProvider).Tree(), nil
}
func (s *Service) NewTreeSyncer(spaceId string, treeManager treemanager.TreeManager) treemanager.TreeSyncer {
s.syncerLock.Lock()
defer s.syncerLock.Unlock()
s.syncer = newTreeSyncer(spaceId, objectLoadTimeout, concurrentTrees, treeManager)
if s.syncStarted {
log.Warn("creating tree syncer after run")
s.syncer.Run()
}
return s.syncer
}
func (s *Service) StartSync() {
s.syncerLock.Lock()
defer s.syncerLock.Unlock()
s.syncStarted = true
if s.syncer != nil {
s.syncer.Run()
}
}
func (s *Service) GetObject(ctx context.Context, spaceId, id string) (sb smartblock.SmartBlock, err error) {
ctx = updateCacheOpts(ctx, func(opts cacheOpts) cacheOpts {
opts.spaceId = spaceId
@ -124,6 +148,11 @@ func (s *Service) GetAccountTree(ctx context.Context, id string) (tr objecttree.
}
func (s *Service) GetAccountObject(ctx context.Context, id string) (sb smartblock.SmartBlock, err error) {
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, objectLoadTimeout)
defer cancel()
}
return s.GetObject(ctx, s.clientService.AccountId(), id)
}
@ -337,7 +366,8 @@ func (s *Service) getDerivedObject(
id = payload.RootRawChange.Id
)
// timing out when getting objects from remote
ctx, cancel = context.WithTimeout(ctx, time.Minute)
// here we set very long timeout, because we must load these documents
ctx, cancel = context.WithTimeout(ctx, derivedObjectLoadTimeout)
ctx = context.WithValue(ctx,
optsKey,
cacheOpts{

View file

@ -97,7 +97,7 @@ func (f *ObjectFactory) Name() (name string) {
}
func (f *ObjectFactory) InitObject(id string, initCtx *smartblock.InitContext) (sb smartblock.SmartBlock, err error) {
sc, err := f.sourceService.NewSource(id, initCtx.SpaceID, initCtx.BuildOpts)
sc, err := f.sourceService.NewSource(initCtx.Ctx, id, initCtx.SpaceID, initCtx.BuildOpts)
if err != nil {
return
}

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"
"go.uber.org/zap"
@ -149,6 +150,10 @@ type Service struct {
fileSync filesync.FileSync
fileService files.Service
// TODO: move all this into separate treecache component or something like this
syncer *treeSyncer
syncStarted bool
syncerLock sync.Mutex
}
func (s *Service) Name() string {

View file

@ -31,7 +31,7 @@ func New() Service {
}
type Service interface {
NewSource(id string, spaceID string, buildOptions BuildOptions) (source Source, err error)
NewSource(ctx context.Context, id string, spaceID string, buildOptions BuildOptions) (source Source, err error)
RegisterStaticSource(id string, s Source)
NewStaticSource(id string, sbType model.SmartBlockType, doc *state.State, pushChange func(p PushChangeParams) (string, error)) SourceWithType
RemoveStaticSource(id string)
@ -81,7 +81,7 @@ func (b *BuildOptions) BuildTreeOpts() commonspace.BuildTreeOpts {
}
}
func (s *service) NewSource(id string, spaceID string, buildOptions BuildOptions) (source Source, err error) {
func (s *service) NewSource(ctx context.Context, id string, spaceID string, buildOptions BuildOptions) (source Source, err error) {
if id == addr.AnytypeProfileId {
return NewAnytypeProfile(id), nil
}
@ -107,7 +107,6 @@ func (s *service) NewSource(id string, spaceID string, buildOptions BuildOptions
return staticSrc, nil
}
ctx := context.Background()
spc, err := s.spaceService.GetSpace(ctx, spaceID)
if err != nil {
return

177
core/block/treesyncer.go Normal file
View file

@ -0,0 +1,177 @@
package block
import (
"context"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/streampool"
"go.uber.org/zap"
"sync"
"time"
)
type executor struct {
pool *streampool.ExecPool
objs map[string]struct{}
sync.Mutex
}
func newExecutor(workers, size int) *executor {
return &executor{
pool: streampool.NewExecPool(workers, size),
objs: map[string]struct{}{},
}
}
func (e *executor) tryAdd(id string, action func()) (err error) {
e.Lock()
defer e.Unlock()
if _, exists := e.objs[id]; exists {
return nil
}
e.objs[id] = struct{}{}
return e.pool.TryAdd(func() {
action()
e.Lock()
defer e.Unlock()
delete(e.objs, id)
})
}
func (e *executor) run() {
e.pool.Run()
}
func (e *executor) close() {
e.pool.Close()
}
type treeSyncer struct {
sync.Mutex
mainCtx context.Context
cancel context.CancelFunc
requests int
spaceId string
timeout time.Duration
requestPools map[string]*executor
headPools map[string]*executor
treeManager treemanager.TreeManager
isRunning bool
}
func newTreeSyncer(spaceId string, timeout time.Duration, concurrentReqs int, treeManager treemanager.TreeManager) *treeSyncer {
mainCtx, cancel := context.WithCancel(context.Background())
return &treeSyncer{
mainCtx: mainCtx,
cancel: cancel,
requests: concurrentReqs,
spaceId: spaceId,
timeout: timeout,
requestPools: map[string]*executor{},
headPools: map[string]*executor{},
treeManager: treeManager,
}
}
func (t *treeSyncer) Init() {
}
func (t *treeSyncer) Run() {
t.Lock()
defer t.Unlock()
t.isRunning = true
log.Info("starting request pool")
for _, p := range t.requestPools {
p.run()
}
for _, p := range t.headPools {
p.run()
}
}
func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error {
t.Lock()
defer t.Unlock()
reqExec, exists := t.requestPools[peerId]
if !exists {
reqExec = newExecutor(t.requests, 0)
if t.isRunning {
reqExec.run()
}
t.requestPools[peerId] = reqExec
}
headExec, exists := t.headPools[peerId]
if !exists {
headExec = newExecutor(1, 0)
if t.isRunning {
headExec.run()
}
t.headPools[peerId] = headExec
}
for _, id := range existing {
idCopy := id
err := headExec.tryAdd(idCopy, func() {
t.updateTree(peerId, idCopy)
})
if err != nil {
log.Error("failed to add to head queue", zap.Error(err))
}
}
for _, id := range missing {
idCopy := id
err := reqExec.tryAdd(idCopy, func() {
t.requestTree(peerId, idCopy)
})
if err != nil {
log.Error("failed to add to request queue", zap.Error(err))
}
}
return nil
}
func (t *treeSyncer) requestTree(peerId, id string) {
log := log.With(zap.String("treeId", id))
ctx := peer.CtxWithPeerId(t.mainCtx, peerId)
ctx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
_, err := t.treeManager.GetTree(ctx, t.spaceId, id)
if err != nil {
log.Warn("can't load missing tree", zap.Error(err))
} else {
log.Debug("loaded missing tree")
}
}
func (t *treeSyncer) updateTree(peerId, id string) {
log := log.With(zap.String("treeId", id))
ctx := peer.CtxWithPeerId(t.mainCtx, peerId)
tr, err := t.treeManager.GetTree(ctx, t.spaceId, id)
if err != nil {
log.Warn("can't load existing tree", zap.Error(err))
return
}
syncTree, ok := tr.(synctree.SyncTree)
if !ok {
log.Warn("not a sync tree")
}
if err = syncTree.SyncWithPeer(ctx, peerId); err != nil {
log.Warn("synctree.SyncWithPeer error", zap.Error(err))
} else {
log.Debug("success synctree.SyncWithPeer")
}
}
func (t *treeSyncer) Close() error {
t.Lock()
defer t.Unlock()
t.cancel()
t.isRunning = false
for _, pool := range t.headPools {
pool.close()
}
for _, pool := range t.requestPools {
pool.close()
}
return nil
}

View file

@ -0,0 +1,119 @@
package block
import (
"context"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestTreeSyncer(t *testing.T) {
ctrl := gomock.NewController(t)
managerMock := mock_treemanager.NewMockTreeManager(ctrl)
spaceId := "spaceId"
peerId := "peerId"
existingId := "existing"
missingId := "missing"
missingMock := mock_objecttree.NewMockObjectTree(ctrl)
existingMock := mock_synctree.NewMockSyncTree(ctrl)
t.Run("delayed sync", func(t *testing.T) {
syncer := newTreeSyncer(spaceId, objectLoadTimeout, 10, managerMock)
syncer.Init()
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(existingMock, nil)
existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId).Return(missingMock, nil)
err := syncer.SyncAll(context.Background(), peerId, []string{existingId}, []string{missingId})
require.NoError(t, err)
require.NotNil(t, syncer.requestPools[peerId])
require.NotNil(t, syncer.headPools[peerId])
syncer.Run()
time.Sleep(100 * time.Millisecond)
syncer.Close()
})
t.Run("sync after run", func(t *testing.T) {
syncer := newTreeSyncer(spaceId, objectLoadTimeout, 10, managerMock)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(existingMock, nil)
existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId).Return(missingMock, nil)
syncer.Init()
syncer.Run()
err := syncer.SyncAll(context.Background(), peerId, []string{existingId}, []string{missingId})
require.NoError(t, err)
require.NotNil(t, syncer.requestPools[peerId])
require.NotNil(t, syncer.headPools[peerId])
time.Sleep(100 * time.Millisecond)
syncer.Close()
})
t.Run("sync same ids", func(t *testing.T) {
syncer := newTreeSyncer(spaceId, objectLoadTimeout, 10, managerMock)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(existingMock, nil)
existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId).Return(missingMock, nil)
syncer.Init()
syncer.Run()
err := syncer.SyncAll(context.Background(), peerId, []string{existingId, existingId}, []string{missingId, missingId, missingId})
require.NoError(t, err)
require.NotNil(t, syncer.requestPools[peerId])
require.NotNil(t, syncer.headPools[peerId])
time.Sleep(100 * time.Millisecond)
syncer.Close()
})
t.Run("sync concurrent ids", func(t *testing.T) {
ch := make(chan struct{}, 2)
syncer := newTreeSyncer(spaceId, objectLoadTimeout, 2, managerMock)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(existingMock, nil)
existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId+"1").DoAndReturn(func(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {
<-ch
return missingMock, nil
})
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId+"2").DoAndReturn(func(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {
<-ch
return missingMock, nil
})
syncer.Init()
syncer.Run()
err := syncer.SyncAll(context.Background(), peerId, []string{existingId}, []string{missingId + "1", missingId + "2"})
require.NoError(t, err)
require.NotNil(t, syncer.requestPools[peerId])
require.NotNil(t, syncer.headPools[peerId])
time.Sleep(100 * time.Millisecond)
syncer.Close()
for i := 0; i < 2; i++ {
ch <- struct{}{}
}
})
t.Run("sync context cancel", func(t *testing.T) {
var events []string
syncer := newTreeSyncer(spaceId, objectLoadTimeout, 1, managerMock)
managerMock.EXPECT().GetTree(gomock.Any(), spaceId, missingId).DoAndReturn(func(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {
<-ctx.Done()
events = append(events, "after done")
return missingMock, nil
})
syncer.Init()
syncer.Run()
err := syncer.SyncAll(context.Background(), peerId, nil, []string{missingId})
require.NoError(t, err)
require.NotNil(t, syncer.requestPools[peerId])
require.NotNil(t, syncer.headPools[peerId])
time.Sleep(100 * time.Millisecond)
events = append(events, "before close")
syncer.Close()
time.Sleep(100 * time.Millisecond)
require.Equal(t, []string{"before close", "after done"}, events)
})
}

View file

@ -91,6 +91,10 @@ type subObjectCreator interface {
CreateSubObjectsInWorkspace(details []*types.Struct) (ids []string, objects []*types.Struct, err error)
}
type syncStarter interface {
StartSync()
}
type indexer struct {
store objectstore.ObjectStore
fileStore filestore.FileStore
@ -99,6 +103,7 @@ type indexer struct {
picker block.Picker
ftsearch ftsearch.FTSearch
subObjectCreator subObjectCreator
syncStarter syncStarter
fileService files.Service
quit chan struct{}
@ -121,6 +126,7 @@ func (i *indexer) Init(a *app.App) (err error) {
i.fileStore = app.MustComponent[filestore.FileStore](a)
i.ftsearch = app.MustComponent[ftsearch.FTSearch](a)
i.subObjectCreator = app.MustComponent[subObjectCreator](a)
i.syncStarter = app.MustComponent[syncStarter](a)
i.quit = make(chan struct{})
i.forceFt = make(chan struct{})
return
@ -359,6 +365,9 @@ func (i *indexer) reindex(ctx context.Context, flags reindexFlags) (err error) {
if err != nil {
return err
}
// starting sync of all other objects later, because we don't want to have problems with loading of derived objects
// due to parallel load which can overload the stream
i.syncStarter.StartSync()
// for all ids except home and archive setting cache timeout for reindexing
// ctx = context.WithValue(ctx, ocache.CacheTimeout, cacheTimeout)

15
go.mod
View file

@ -7,7 +7,7 @@ require (
github.com/PuerkitoBio/goquery v1.8.1
github.com/VividCortex/ewma v1.2.0
github.com/adrium/goheif v0.0.0-20230113233934-ca402e77a786
github.com/anyproto/any-sync v0.1.1
github.com/anyproto/any-sync v0.1.5
github.com/anyproto/go-naturaldate/v2 v2.0.2-0.20230524105841-9829cfd13438
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/blevesearch/bleve/v2 v2.3.6
@ -175,11 +175,12 @@ require (
github.com/ipfs/go-blockservice v0.5.1 // indirect
github.com/ipfs/go-ipfs-chunker v0.0.5 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect
github.com/ipfs/go-ipfs-files v0.2.0 // indirect
github.com/ipfs/go-ipfs-files v0.3.0 // indirect
github.com/ipfs/go-ipfs-posinfo v0.0.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
github.com/ipfs/go-libipfs v0.4.0 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-verifcid v0.0.2 // indirect
@ -208,7 +209,7 @@ require (
github.com/onsi/ginkgo/v2 v2.9.5 // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
@ -227,22 +228,22 @@ require (
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa // indirect
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/zeebo/blake3 v0.2.3 // indirect
github.com/zeebo/errs v1.3.0 // indirect
go.etcd.io/bbolt v1.3.5 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.opentelemetry.io/otel v1.11.2 // indirect
go.opentelemetry.io/otel/trace v1.11.2 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/tools v0.9.1 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
google.golang.org/protobuf v1.30.0 // indirect

43
go.sum
View file

@ -42,8 +42,8 @@ github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxB
github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/anyproto/any-sync v0.1.1 h1:fZ6SfiSf4tX3Ab+76cZAxAm/3uY60lnndyBgeqenT8w=
github.com/anyproto/any-sync v0.1.1/go.mod h1:N56AZy/MmqabgNPWc4+Ta6Tbkk0Ob7sYLfNpzK4Hv7M=
github.com/anyproto/any-sync v0.1.5 h1:6Rc4cgnAN11IEdyPepGE3en2VnRG4+NRm1zuHnuoBRk=
github.com/anyproto/any-sync v0.1.5/go.mod h1:N56AZy/MmqabgNPWc4+Ta6Tbkk0Ob7sYLfNpzK4Hv7M=
github.com/anyproto/go-chash v0.1.0 h1:I9meTPjXFRfXZHRJzjOHC/XF7Q5vzysKkiT/grsogXY=
github.com/anyproto/go-chash v0.1.0/go.mod h1:0UjNQi3PDazP0fINpFYu6VKhuna+W/V+1vpXHAfNgLY=
github.com/anyproto/go-ds-badger3 v0.3.1-0.20230524095230-434cf6346d9b h1:SMizb43hfILk2bpMgpTd30n6yQQdxW0ZbDti0wqfsBw=
@ -260,7 +260,6 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
@ -284,6 +283,7 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEe
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/go-xmlfmt/xmlfmt v0.0.0-20191208150333-d5b6f63a941b h1:khEcpUM4yFcxg4/FHQWkvVRmgijNXRfzkIDHh23ggEo=
github.com/go-xmlfmt/xmlfmt v0.0.0-20191208150333-d5b6f63a941b/go.mod h1:aUCEOzzezBEjDBbFBoSiya/gduyIiWYRP6CnSFIV8AM=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
@ -356,7 +356,6 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
@ -487,8 +486,8 @@ github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNo
github.com/ipfs/go-ipfs-exchange-interface v0.2.1 h1:jMzo2VhLKSHbVe+mHNzYgs95n0+t0Q69GQ5WhRDZV/s=
github.com/ipfs/go-ipfs-exchange-interface v0.2.1/go.mod h1:MUsYn6rKbG6CTtsDp+lKJPmVt3ZrCViNyH3rfPGsZ2E=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA=
github.com/ipfs/go-ipfs-files v0.2.0 h1:z6MCYHQSZpDWpUSK59Kf0ajP1fi4gLCf6fIulVsp8A8=
github.com/ipfs/go-ipfs-files v0.2.0/go.mod h1:vT7uaQfIsprKktzbTPLnIsd+NGw9ZbYwSq0g3N74u0M=
github.com/ipfs/go-ipfs-files v0.3.0 h1:fallckyc5PYjuMEitPNrjRfpwl7YFt69heCOUhsbGxQ=
github.com/ipfs/go-ipfs-files v0.3.0/go.mod h1:xAUtYMwB+iu/dtf6+muHNSFQCJG2dSiStR2P6sn9tIM=
github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs=
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
@ -504,6 +503,8 @@ github.com/ipfs/go-ipld-format v0.4.0 h1:yqJSaJftjmjc9jEOFYlpkwOLVKv68OD27jFLlSg
github.com/ipfs/go-ipld-format v0.4.0/go.mod h1:co/SdBE8h99968X0hViiw1MNlh6fvxxnHpvVLnH7jSM=
github.com/ipfs/go-ipld-legacy v0.1.1 h1:BvD8PEuqwBHLTKqlGFTHSwrwFOMkVESEvwIYwR2cdcc=
github.com/ipfs/go-ipld-legacy v0.1.1/go.mod h1:8AyKFCjgRPsQFf15ZQgDB8Din4DML/fOmKZkkFkrIEg=
github.com/ipfs/go-libipfs v0.4.0 h1:TkUxJGjtPnSzAgkw7VjS0/DBay3MPjmTBa4dGdUQCDE=
github.com/ipfs/go-libipfs v0.4.0/go.mod h1:XsU2cP9jBhDrXoJDe0WxikB8XcVmD3k2MEZvB3dbYu8=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/ipfs/go-log v1.0.3/go.mod h1:OsLySYkwIbiSUR/yBTdv1qPtcE4FW3WPWk/ewz9Ru+A=
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
@ -760,8 +761,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
github.com/polydawn/refmt v0.0.0-20190807091052-3d65705ee9f1/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e h1:ZOcivgkkFRnjfoTcGsDq3UQYiBmekwLA+qg0OjyB/ls=
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4=
github.com/polydawn/refmt v0.89.0/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
@ -853,11 +854,13 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa/go.mod h1:2RVY1rIf+2J2o/IM9+vPq9RzmHDSseB7FoXiSNIUsoU=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg=
github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
@ -922,13 +925,15 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/warpfork/go-testmark v0.11.0 h1:J6LnV8KpceDvo7spaNU4+DauH2n1x+6RaO2rJrmpQ9U=
github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI=
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 h1:bsUlNhdmbtlfdLVXAVfuvKQ01RnWAM09TVrJkI7NZs4=
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ=
github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa h1:EyA027ZAkuaCLoxVX4r1TZMPy1d31fM6hbfQ4OU4I5o=
github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ=
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E=
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
@ -961,10 +966,10 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0=
go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI=
go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0=
go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
@ -1199,8 +1204,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=

View file

@ -3,11 +3,11 @@ package peermanager
import (
"context"
"fmt"
"github.com/anyproto/anytype-heart/space/peerstore"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/streampool"
"github.com/anyproto/anytype-heart/space/peerstore"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"storj.io/drpc"
@ -27,7 +27,7 @@ func (n *clientPeerManager) init() {
}
func (n *clientPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
ctx = logger.CtxWithFields(context.Background(), logger.CtxGetFields(ctx)...)
ctx = logger.CtxWithFields(ctx, logger.CtxGetFields(ctx)...)
var drpcMsg drpc.Message
drpcMsg = msg
if msg.ReplyId != "" || msg.RequestId != "" {
@ -40,7 +40,7 @@ func (n *clientPeerManager) SendPeer(ctx context.Context, peerId string, msg *sp
}
func (n *clientPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
ctx = logger.CtxWithFields(context.Background(), logger.CtxGetFields(ctx)...)
ctx = logger.CtxWithFields(ctx, logger.CtxGetFields(ctx)...)
return n.p.streamPool.Send(ctx, msg, func(ctx context.Context) (peers []peer.Peer, err error) {
return n.getStreamResponsiblePeers(ctx)
})