From cc3da7e66b404bec5ea047e3b5ef0984faf817c7 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 13 Jun 2023 19:00:01 +0200 Subject: [PATCH] Simplify tree remote getter --- commonspace/object/tree/synctree/synctree.go | 1 - .../object/tree/synctree/treeremotegetter.go | 39 ++-------- .../tree/synctree/treeremotegetter_test.go | 75 +------------------ commonspace/objecttreebuilder/treebuilder.go | 7 +- commonspace/requestmanager/requestmanager.go | 2 + 5 files changed, 15 insertions(+), 109 deletions(-) diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index a21121bf..52c33ed5 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -71,7 +71,6 @@ type BuildDeps struct { SyncStatus syncstatus.StatusUpdater PeerGetter ResponsiblePeersGetter BuildObjectTree objecttree.BuildObjectTreeFunc - RetryTimeout time.Duration } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { diff --git a/commonspace/object/tree/synctree/treeremotegetter.go b/commonspace/object/tree/synctree/treeremotegetter.go index 2c1476ae..64cd1d9c 100644 --- a/commonspace/object/tree/synctree/treeremotegetter.go +++ b/commonspace/object/tree/synctree/treeremotegetter.go @@ -3,22 +3,17 @@ package synctree import ( "context" "errors" - "time" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/net/peer" - "github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/gogo/protobuf/proto" "go.uber.org/zap" ) var ( - newRequestTimeout = 1 * time.Second - - ErrRetryTimeout = errors.New("failed to retry request") ErrNoResponsiblePeers = errors.New("no responsible peers") ) @@ -65,33 +60,13 @@ func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg * return } -func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, retryTimeout time.Duration) (msg *treechangeproto.TreeSyncMessage, err error) { - peerIdx := 0 - retryCtx, cancel := context.WithTimeout(ctx, retryTimeout) - defer cancel() - for { - availablePeers, err := t.getPeers(ctx) - if err != nil { - if retryTimeout == 0 { - return nil, err - } - } else { - peerIdx = peerIdx % len(availablePeers) - msg, err = t.treeRequest(ctx, availablePeers[peerIdx]) - // if no error or it doesn't make sense to retry - err = rpcerr.Unwrap(err) - if err == nil || err == treechangeproto.ErrGetTree || retryTimeout == 0 { - return msg, err - } - peerIdx++ - } - select { - case <-time.After(newRequestTimeout): - break - case <-retryCtx.Done(): - return nil, ErrRetryTimeout - } +func (t treeRemoteGetter) treeRequestLoop(ctx context.Context) (msg *treechangeproto.TreeSyncMessage, err error) { + availablePeers, err := t.getPeers(ctx) + if err != nil { + return } + // in future we will try to load from different peers + return t.treeRequest(ctx, availablePeers[0]) } func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, isRemote bool, err error) { @@ -113,7 +88,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage. } isRemote = true - resp, err := t.treeRequestLoop(ctx, t.deps.RetryTimeout) + resp, err := t.treeRequestLoop(ctx) if err != nil { return } diff --git a/commonspace/object/tree/synctree/treeremotegetter_test.go b/commonspace/object/tree/synctree/treeremotegetter_test.go index 6b719e93..60452b1a 100644 --- a/commonspace/object/tree/synctree/treeremotegetter_test.go +++ b/commonspace/object/tree/synctree/treeremotegetter_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "testing" - "time" "github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" @@ -49,8 +48,6 @@ func (fx *treeRemoteGetterFixture) stop() { } func TestTreeRemoteGetter(t *testing.T) { - newRequestTimeout = 20 * time.Millisecond - retryTimeout := 2 * newRequestTimeout ctx := context.Background() peerId := "peerId" treeRequest := &treechangeproto.TreeSyncMessage{} @@ -70,18 +67,7 @@ func TestTreeRemoteGetter(t *testing.T) { fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil) fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest) fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil) - resp, err := fx.treeGetter.treeRequestLoop(ctx, 0) - require.NoError(t, err) - require.Equal(t, "id", resp.RootChange.Id) - }) - - t.Run("request peerId from context", func(t *testing.T) { - fx := newTreeRemoteGetterFixture(t) - defer fx.stop() - ctx := peer.CtxWithPeerId(ctx, peerId) - fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest) - fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil) - resp, err := fx.treeGetter.treeRequestLoop(ctx, 0) + resp, err := fx.treeGetter.treeRequestLoop(ctx) require.NoError(t, err) require.Equal(t, "id", resp.RootChange.Id) }) @@ -89,66 +75,13 @@ func TestTreeRemoteGetter(t *testing.T) { t.Run("request fails", func(t *testing.T) { fx := newTreeRemoteGetterFixture(t) defer fx.stop() + treeRequest := &treechangeproto.TreeSyncMessage{} mockPeer := mock_peer.NewMockPeer(fx.ctrl) mockPeer.EXPECT().Id().AnyTimes().Return(peerId) fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil) fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest) - fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(nil, fmt.Errorf("failed")) - _, err := fx.treeGetter.treeRequestLoop(ctx, 0) - require.Error(t, err) - require.NotEqual(t, ErrRetryTimeout, err) - }) - - t.Run("retry request success", func(t *testing.T) { - fx := newTreeRemoteGetterFixture(t) - defer fx.stop() - mockPeer := mock_peer.NewMockPeer(fx.ctrl) - mockPeer.EXPECT().Id().AnyTimes().Return(peerId) - fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil) - fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest) - fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(nil, fmt.Errorf("some")) - fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(objectResponse, nil) - resp, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout) - require.NoError(t, err) - require.Equal(t, "id", resp.RootChange.Id) - }) - - t.Run("no retry request if error get tree", func(t *testing.T) { - fx := newTreeRemoteGetterFixture(t) - defer fx.stop() - mockPeer := mock_peer.NewMockPeer(fx.ctrl) - mockPeer.EXPECT().Id().AnyTimes().Return(peerId) - fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil) - fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest) - fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(nil, treechangeproto.ErrGetTree) - _, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout) - require.Equal(t, treechangeproto.ErrGetTree, err) - }) - - t.Run("retry get peers success", func(t *testing.T) { - fx := newTreeRemoteGetterFixture(t) - defer fx.stop() - mockPeer := mock_peer.NewMockPeer(fx.ctrl) - mockPeer.EXPECT().Id().AnyTimes().Return(peerId) - fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Times(1).Return([]peer.Peer{}, nil) - fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Times(1).Return([]peer.Peer{mockPeer}, nil) - fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest) - fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(objectResponse, nil) - resp, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout) - require.NoError(t, err) - require.Equal(t, "id", resp.RootChange.Id) - }) - - t.Run("retry request fail", func(t *testing.T) { - fx := newTreeRemoteGetterFixture(t) - defer fx.stop() - treeRequest := &treechangeproto.TreeSyncMessage{} - mockPeer := mock_peer.NewMockPeer(fx.ctrl) - mockPeer.EXPECT().Id().AnyTimes().Return(peerId) - fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil) - fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest) fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).AnyTimes().Return(nil, fmt.Errorf("some")) - _, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout) - require.Equal(t, ErrRetryTimeout, err) + _, err := fx.treeGetter.treeRequestLoop(ctx) + require.Error(t, err) }) } diff --git a/commonspace/objecttreebuilder/treebuilder.go b/commonspace/objecttreebuilder/treebuilder.go index 9b7f9ab0..640416a7 100644 --- a/commonspace/objecttreebuilder/treebuilder.go +++ b/commonspace/objecttreebuilder/treebuilder.go @@ -5,7 +5,6 @@ import ( "context" "errors" "sync/atomic" - "time" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/logger" @@ -28,9 +27,8 @@ import ( ) type BuildTreeOpts struct { - Listener updatelistener.UpdateListener - RetryTimeout time.Duration - TreeBuilder objecttree.BuildObjectTreeFunc + Listener updatelistener.UpdateListener + TreeBuilder objecttree.BuildObjectTreeFunc } const CName = "common.commonspace.objecttreebuilder" @@ -121,7 +119,6 @@ func (t *treeBuilder) BuildTree(ctx context.Context, id string, opts BuildTreeOp SpaceStorage: t.spaceStorage, OnClose: t.onClose, SyncStatus: t.syncStatus, - RetryTimeout: opts.RetryTimeout, PeerGetter: t.peerManager, BuildObjectTree: treeBuilder, } diff --git a/commonspace/requestmanager/requestmanager.go b/commonspace/requestmanager/requestmanager.go index ea7a2b05..7a6dd019 100644 --- a/commonspace/requestmanager/requestmanager.go +++ b/commonspace/requestmanager/requestmanager.go @@ -10,6 +10,7 @@ import ( "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/pool" + "github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/net/streampool" "go.uber.org/zap" "storj.io/drpc" @@ -123,5 +124,6 @@ func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spac resp, err = cl.ObjectSync(ctx, msg) return err }) + err = rpcerr.Unwrap(err) return }