From 95fe3ba04ca3b1385ef8a6a7eefe1a87286411e3 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Wed, 8 Nov 2023 15:00:26 +0100 Subject: [PATCH 1/3] peerservice: ignore addrs list --- net/peerservice/peerservice.go | 55 ++++++++++++++++++----------- net/peerservice/peerservice_test.go | 26 +++++++++++--- 2 files changed, 56 insertions(+), 25 deletions(-) diff --git a/net/peerservice/peerservice.go b/net/peerservice/peerservice.go index 0f880690..ab9d93af 100644 --- a/net/peerservice/peerservice.go +++ b/net/peerservice/peerservice.go @@ -4,6 +4,12 @@ import ( "context" "errors" "fmt" + "strings" + "sync" + "time" + + "go.uber.org/zap" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/net/peer" @@ -13,9 +19,6 @@ import ( "github.com/anyproto/any-sync/net/transport/quic" "github.com/anyproto/any-sync/net/transport/yamux" "github.com/anyproto/any-sync/nodeconf" - "go.uber.org/zap" - "strings" - "sync" ) const CName = "net.peerservice" @@ -40,14 +43,16 @@ type PeerService interface { } type peerService struct { - yamux transport.Transport - quic transport.Transport - nodeConf nodeconf.NodeConf - peerAddrs map[string][]string - pool pool.Pool - server server.DRPCServer - preferQuic bool - mu sync.RWMutex + yamux transport.Transport + quic transport.Transport + nodeConf nodeconf.NodeConf + peerAddrs map[string][]string + ignoreAddrs *sync.Map + ignoreTimeout time.Duration + pool pool.Pool + server server.DRPCServer + preferQuic bool + mu sync.RWMutex } func (p *peerService) Init(a *app.App) (err error) { @@ -57,6 +62,8 @@ func (p *peerService) Init(a *app.App) (err error) { p.pool = a.MustComponent(pool.CName).(pool.Pool) p.server = a.MustComponent(server.CName).(server.DRPCServer) p.peerAddrs = map[string][]string{} + p.ignoreAddrs = &sync.Map{} + p.ignoreTimeout = time.Minute * 3 p.yamux.SetAccepter(p) p.quic.SetAccepter(p) return nil @@ -79,21 +86,21 @@ func (p *peerService) PreferQuic(prefer bool) { func (p *peerService) Dial(ctx context.Context, peerId string) (pr peer.Peer, err error) { p.mu.RLock() - defer p.mu.RUnlock() - - addrs, err := p.getPeerAddrs(peerId) - if err != nil { - return - } - - var mc transport.MultiConn - log.DebugCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs)) - var schemes = yamuxPreferSchemes if p.preferQuic { schemes = quicPreferSchemes } + addrs, err := p.getPeerAddrs(peerId) + if err != nil { + p.mu.RUnlock() + return + } + p.mu.RUnlock() + + var mc transport.MultiConn + log.DebugCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs)) + err = ErrAddrsNotFound for _, sch := range schemes { if mc, err = p.dialScheme(ctx, sch, addrs); err == nil { @@ -125,14 +132,20 @@ func (p *peerService) dialScheme(ctx context.Context, sch string, addrs []string } err = ErrAddrsNotFound + now := time.Now() for _, addr := range addrs { if scheme(addr) != sch { continue } + if tm, ok := p.ignoreAddrs.Load(addr); ok && tm.(time.Time).After(now) { + continue + } if mc, err = tr.Dial(ctx, stripScheme(addr)); err == nil { + p.ignoreAddrs.Delete(addr) return } else { log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err)) + p.ignoreAddrs.Store(addr, time.Now().Add(p.ignoreTimeout)) } } return diff --git a/net/peerservice/peerservice_test.go b/net/peerservice/peerservice_test.go index 0856f5bc..c20fded0 100644 --- a/net/peerservice/peerservice_test.go +++ b/net/peerservice/peerservice_test.go @@ -3,6 +3,12 @@ package peerservice import ( "context" "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/pool" @@ -12,10 +18,6 @@ import ( "github.com/anyproto/any-sync/net/transport/yamux" "github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/nodeconf/mock_nodeconf" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" - "testing" ) var ctx = context.Background() @@ -111,6 +113,22 @@ func TestPeerService_Dial(t *testing.T) { require.NoError(t, err) assert.NotNil(t, p) }) + t.Run("ignore", func(t *testing.T) { + fx := newFixture(t) + defer fx.finish(t) + fx.PreferQuic(false) + var peerId = "p1" + + fx.nodeConf.EXPECT().PeerAddresses(peerId).Return([]string{"127.0.0.1:1111"}, true).AnyTimes() + + fx.yamux.MockTransport.EXPECT().Dial(ctx, "127.0.0.1:1111").Return(nil, fmt.Errorf("error")) + + _, err := fx.Dial(ctx, peerId) + require.Error(t, err) + + _, err = fx.Dial(ctx, peerId) + require.EqualError(t, err, ErrAddrsNotFound.Error()) + }) } func TestPeerService_Accept(t *testing.T) { From 2c8ff79256280a39f74b6adb2ade6f48aec77ca1 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Wed, 8 Nov 2023 15:03:18 +0100 Subject: [PATCH 2/3] use lcoal peers for a space pull + retry --- commonspace/spaceservice.go | 54 ++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index c4e3b657..2540e786 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -3,9 +3,18 @@ package commonspace import ( "context" + "errors" + "sync/atomic" + "time" + + "go.uber.org/zap" + "github.com/anyproto/any-sync/commonspace/deletionmanager" "github.com/anyproto/any-sync/commonspace/object/treesyncer" - "sync/atomic" + "github.com/anyproto/any-sync/net" + "github.com/anyproto/any-sync/net/peer" + + "storj.io/drpc" "github.com/anyproto/any-sync/accountservice" "github.com/anyproto/any-sync/app" @@ -30,11 +39,9 @@ import ( "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/consensus/consensusproto" "github.com/anyproto/any-sync/metric" - "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/nodeconf" - "storj.io/drpc" ) const CName = "common.commonspace" @@ -217,19 +224,47 @@ func (s *spaceService) addSpaceStorage(ctx context.Context, spaceDescription Spa } func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string) (st spacestorage.SpaceStorage, err error) { - var p peer.Peer - lastConfiguration := s.configurationService // we can't connect to client if it is a node - if lastConfiguration.IsResponsible(id) { + if s.configurationService.IsResponsible(id) { err = spacesyncproto.ErrSpaceMissing return } - p, err = s.pool.GetOneOf(ctx, lastConfiguration.NodeIds(id)) + sm, err := s.peerManagerProvider.NewPeerManager(ctx, id) if err != nil { - return + return nil, err } + var peers []peer.Peer + for { + peers, err = sm.GetResponsiblePeers(ctx) + if err != nil && !errors.Is(err, net.ErrUnableToConnect) { + return nil, err + } + if len(peers) == 0 { + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return nil, ctx.Err() + } + } else { + break + } + } + for i, p := range peers { + if st, err = s.spacePullWithPeer(ctx, p, id); err != nil { + if i+1 == len(peers) { + return + } else { + log.InfoCtx(ctx, "unable to pull space", zap.String("spaceId", id), zap.String("peerId", p.Id())) + } + } else { + return + } + } + return nil, net.ErrUnableToConnect +} +func (s *spaceService) spacePullWithPeer(ctx context.Context, p peer.Peer, id string) (st spacestorage.SpaceStorage, err error) { var res *spacesyncproto.SpacePullResponse err = p.DoDrpc(ctx, func(conn drpc.Conn) error { cl := spacesyncproto.NewDRPCSpaceSyncClient(conn) @@ -241,7 +276,7 @@ func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string) return } - st, err = s.createSpaceStorage(spacestorage.SpaceStorageCreatePayload{ + return s.createSpaceStorage(spacestorage.SpaceStorageCreatePayload{ AclWithId: &consensusproto.RawRecordWithId{ Payload: res.Payload.AclPayload, Id: res.Payload.AclPayloadId, @@ -252,7 +287,6 @@ func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string) }, SpaceHeaderWithId: res.Payload.SpaceHeader, }) - return } func (s *spaceService) createSpaceStorage(payload spacestorage.SpaceStorageCreatePayload) (spacestorage.SpaceStorage, error) { From c69946fcc56eb97d2a94396282dc20a789bc4396 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Wed, 8 Nov 2023 15:28:28 +0100 Subject: [PATCH 3/3] downgrade ipfs to v0.13.1 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index c86149a7..ab7c4f18 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/google/uuid v1.4.0 github.com/hashicorp/yamux v0.1.1 github.com/huandu/skiplist v1.2.0 - github.com/ipfs/boxo v0.15.0 + github.com/ipfs/boxo v0.13.1 github.com/ipfs/go-block-format v0.2.0 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-ipld-format v0.6.0 diff --git a/go.sum b/go.sum index bf0e51c9..254472f0 100644 --- a/go.sum +++ b/go.sum @@ -79,8 +79,8 @@ github.com/huandu/skiplist v1.2.0/go.mod h1:7v3iFjLcSAzO4fN5B8dvebvo/qsfumiLiDXM github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.15.0 h1:BriLydj2nlK1nKeJQHxcKSuG5ZXcoutzhBklOtxC5pk= -github.com/ipfs/boxo v0.15.0/go.mod h1:X5ulcbR5Nh7sm3Db8+08AApUo6FsGC5mb23QDKAoB/M= +github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI= +github.com/ipfs/boxo v0.13.1/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs=