1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 14:07:02 +09:00

Simplify tree remote getter

This commit is contained in:
mcrakhman 2023-06-13 19:00:01 +02:00
parent 060c6d1231
commit cc3da7e66b
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
5 changed files with 15 additions and 109 deletions

View file

@ -71,7 +71,6 @@ type BuildDeps struct {
SyncStatus syncstatus.StatusUpdater
PeerGetter ResponsiblePeersGetter
BuildObjectTree objecttree.BuildObjectTreeFunc
RetryTimeout time.Duration
}
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {

View file

@ -3,22 +3,17 @@ package synctree
import (
"context"
"errors"
"time"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/rpc/rpcerr"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
)
var (
newRequestTimeout = 1 * time.Second
ErrRetryTimeout = errors.New("failed to retry request")
ErrNoResponsiblePeers = errors.New("no responsible peers")
)
@ -65,33 +60,13 @@ func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *
return
}
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, retryTimeout time.Duration) (msg *treechangeproto.TreeSyncMessage, err error) {
peerIdx := 0
retryCtx, cancel := context.WithTimeout(ctx, retryTimeout)
defer cancel()
for {
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context) (msg *treechangeproto.TreeSyncMessage, err error) {
availablePeers, err := t.getPeers(ctx)
if err != nil {
if retryTimeout == 0 {
return nil, err
}
} else {
peerIdx = peerIdx % len(availablePeers)
msg, err = t.treeRequest(ctx, availablePeers[peerIdx])
// if no error or it doesn't make sense to retry
err = rpcerr.Unwrap(err)
if err == nil || err == treechangeproto.ErrGetTree || retryTimeout == 0 {
return msg, err
}
peerIdx++
}
select {
case <-time.After(newRequestTimeout):
break
case <-retryCtx.Done():
return nil, ErrRetryTimeout
}
return
}
// in future we will try to load from different peers
return t.treeRequest(ctx, availablePeers[0])
}
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, isRemote bool, err error) {
@ -113,7 +88,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
}
isRemote = true
resp, err := t.treeRequestLoop(ctx, t.deps.RetryTimeout)
resp, err := t.treeRequestLoop(ctx)
if err != nil {
return
}

View file

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"testing"
"time"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
@ -49,8 +48,6 @@ func (fx *treeRemoteGetterFixture) stop() {
}
func TestTreeRemoteGetter(t *testing.T) {
newRequestTimeout = 20 * time.Millisecond
retryTimeout := 2 * newRequestTimeout
ctx := context.Background()
peerId := "peerId"
treeRequest := &treechangeproto.TreeSyncMessage{}
@ -70,18 +67,7 @@ func TestTreeRemoteGetter(t *testing.T) {
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, 0)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("request peerId from context", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
ctx := peer.CtxWithPeerId(ctx, peerId)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, 0)
resp, err := fx.treeGetter.treeRequestLoop(ctx)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
@ -89,66 +75,13 @@ func TestTreeRemoteGetter(t *testing.T) {
t.Run("request fails", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
treeRequest := &treechangeproto.TreeSyncMessage{}
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(nil, fmt.Errorf("failed"))
_, err := fx.treeGetter.treeRequestLoop(ctx, 0)
require.Error(t, err)
require.NotEqual(t, ErrRetryTimeout, err)
})
t.Run("retry request success", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(nil, fmt.Errorf("some"))
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("no retry request if error get tree", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(nil, treechangeproto.ErrGetTree)
_, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout)
require.Equal(t, treechangeproto.ErrGetTree, err)
})
t.Run("retry get peers success", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Times(1).Return([]peer.Peer{}, nil)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Times(1).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("retry request fail", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
treeRequest := &treechangeproto.TreeSyncMessage{}
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).AnyTimes().Return(nil, fmt.Errorf("some"))
_, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout)
require.Equal(t, ErrRetryTimeout, err)
_, err := fx.treeGetter.treeRequestLoop(ctx)
require.Error(t, err)
})
}

View file

@ -5,7 +5,6 @@ import (
"context"
"errors"
"sync/atomic"
"time"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
@ -29,7 +28,6 @@ import (
type BuildTreeOpts struct {
Listener updatelistener.UpdateListener
RetryTimeout time.Duration
TreeBuilder objecttree.BuildObjectTreeFunc
}
@ -121,7 +119,6 @@ func (t *treeBuilder) BuildTree(ctx context.Context, id string, opts BuildTreeOp
SpaceStorage: t.spaceStorage,
OnClose: t.onClose,
SyncStatus: t.syncStatus,
RetryTimeout: opts.RetryTimeout,
PeerGetter: t.peerManager,
BuildObjectTree: treeBuilder,
}

View file

@ -10,6 +10,7 @@ import (
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/rpcerr"
"github.com/anyproto/any-sync/net/streampool"
"go.uber.org/zap"
"storj.io/drpc"
@ -123,5 +124,6 @@ func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spac
resp, err = cl.ObjectSync(ctx, msg)
return err
})
err = rpcerr.Unwrap(err)
return
}