From 2c8ff79256280a39f74b6adb2ade6f48aec77ca1 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Wed, 8 Nov 2023 15:03:18 +0100 Subject: [PATCH] use lcoal peers for a space pull + retry --- commonspace/spaceservice.go | 54 ++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index c4e3b657..2540e786 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -3,9 +3,18 @@ package commonspace import ( "context" + "errors" + "sync/atomic" + "time" + + "go.uber.org/zap" + "github.com/anyproto/any-sync/commonspace/deletionmanager" "github.com/anyproto/any-sync/commonspace/object/treesyncer" - "sync/atomic" + "github.com/anyproto/any-sync/net" + "github.com/anyproto/any-sync/net/peer" + + "storj.io/drpc" "github.com/anyproto/any-sync/accountservice" "github.com/anyproto/any-sync/app" @@ -30,11 +39,9 @@ import ( "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/consensus/consensusproto" "github.com/anyproto/any-sync/metric" - "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/nodeconf" - "storj.io/drpc" ) const CName = "common.commonspace" @@ -217,19 +224,47 @@ func (s *spaceService) addSpaceStorage(ctx context.Context, spaceDescription Spa } func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string) (st spacestorage.SpaceStorage, err error) { - var p peer.Peer - lastConfiguration := s.configurationService // we can't connect to client if it is a node - if lastConfiguration.IsResponsible(id) { + if s.configurationService.IsResponsible(id) { err = spacesyncproto.ErrSpaceMissing return } - p, err = s.pool.GetOneOf(ctx, lastConfiguration.NodeIds(id)) + sm, err := s.peerManagerProvider.NewPeerManager(ctx, id) if err != nil { - return + return nil, err } + var peers []peer.Peer + for { + peers, err = sm.GetResponsiblePeers(ctx) + if err != nil && !errors.Is(err, net.ErrUnableToConnect) { + return nil, err + } + if len(peers) == 0 { + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return nil, ctx.Err() + } + } else { + break + } + } + for i, p := range peers { + if st, err = s.spacePullWithPeer(ctx, p, id); err != nil { + if i+1 == len(peers) { + return + } else { + log.InfoCtx(ctx, "unable to pull space", zap.String("spaceId", id), zap.String("peerId", p.Id())) + } + } else { + return + } + } + return nil, net.ErrUnableToConnect +} +func (s *spaceService) spacePullWithPeer(ctx context.Context, p peer.Peer, id string) (st spacestorage.SpaceStorage, err error) { var res *spacesyncproto.SpacePullResponse err = p.DoDrpc(ctx, func(conn drpc.Conn) error { cl := spacesyncproto.NewDRPCSpaceSyncClient(conn) @@ -241,7 +276,7 @@ func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string) return } - st, err = s.createSpaceStorage(spacestorage.SpaceStorageCreatePayload{ + return s.createSpaceStorage(spacestorage.SpaceStorageCreatePayload{ AclWithId: &consensusproto.RawRecordWithId{ Payload: res.Payload.AclPayload, Id: res.Payload.AclPayloadId, @@ -252,7 +287,6 @@ func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string) }, SpaceHeaderWithId: res.Payload.SpaceHeader, }) - return } func (s *spaceService) createSpaceStorage(payload spacestorage.SpaceStorageCreatePayload) (spacestorage.SpaceStorage, error) {