1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Revert "peerservice: ignore addrs list"

This reverts commit 95fe3ba04c.
This commit is contained in:
Sergey Cherepanov 2023-12-22 16:44:06 +01:00
parent 68e62f3eb7
commit 66e3e50b6d
No known key found for this signature in database
GPG key ID: 87F8EDE8FBDF637C
2 changed files with 25 additions and 56 deletions

View file

@ -4,12 +4,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"sync"
"time"
"go.uber.org/zap"
"github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/peer"
@ -19,6 +13,9 @@ import (
"github.com/anyproto/any-sync/net/transport/quic" "github.com/anyproto/any-sync/net/transport/quic"
"github.com/anyproto/any-sync/net/transport/yamux" "github.com/anyproto/any-sync/net/transport/yamux"
"github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/nodeconf"
"go.uber.org/zap"
"strings"
"sync"
) )
const CName = "net.peerservice" const CName = "net.peerservice"
@ -43,16 +40,14 @@ type PeerService interface {
} }
type peerService struct { type peerService struct {
yamux transport.Transport yamux transport.Transport
quic transport.Transport quic transport.Transport
nodeConf nodeconf.NodeConf nodeConf nodeconf.NodeConf
peerAddrs map[string][]string peerAddrs map[string][]string
ignoreAddrs *sync.Map pool pool.Pool
ignoreTimeout time.Duration server server.DRPCServer
pool pool.Pool preferQuic bool
server server.DRPCServer mu sync.RWMutex
preferQuic bool
mu sync.RWMutex
} }
func (p *peerService) Init(a *app.App) (err error) { func (p *peerService) Init(a *app.App) (err error) {
@ -62,8 +57,6 @@ func (p *peerService) Init(a *app.App) (err error) {
p.pool = a.MustComponent(pool.CName).(pool.Pool) p.pool = a.MustComponent(pool.CName).(pool.Pool)
p.server = a.MustComponent(server.CName).(server.DRPCServer) p.server = a.MustComponent(server.CName).(server.DRPCServer)
p.peerAddrs = map[string][]string{} p.peerAddrs = map[string][]string{}
p.ignoreAddrs = &sync.Map{}
p.ignoreTimeout = time.Minute * 3
p.yamux.SetAccepter(p) p.yamux.SetAccepter(p)
p.quic.SetAccepter(p) p.quic.SetAccepter(p)
return nil return nil
@ -86,21 +79,21 @@ func (p *peerService) PreferQuic(prefer bool) {
func (p *peerService) Dial(ctx context.Context, peerId string) (pr peer.Peer, err error) { func (p *peerService) Dial(ctx context.Context, peerId string) (pr peer.Peer, err error) {
p.mu.RLock() p.mu.RLock()
defer p.mu.RUnlock()
addrs, err := p.getPeerAddrs(peerId)
if err != nil {
return
}
var mc transport.MultiConn
log.DebugCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
var schemes = yamuxPreferSchemes var schemes = yamuxPreferSchemes
if p.preferQuic { if p.preferQuic {
schemes = quicPreferSchemes schemes = quicPreferSchemes
} }
addrs, err := p.getPeerAddrs(peerId)
if err != nil {
p.mu.RUnlock()
return
}
p.mu.RUnlock()
var mc transport.MultiConn
log.DebugCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
err = ErrAddrsNotFound err = ErrAddrsNotFound
for _, sch := range schemes { for _, sch := range schemes {
if mc, err = p.dialScheme(ctx, sch, addrs); err == nil { if mc, err = p.dialScheme(ctx, sch, addrs); err == nil {
@ -132,20 +125,14 @@ func (p *peerService) dialScheme(ctx context.Context, sch string, addrs []string
} }
err = ErrAddrsNotFound err = ErrAddrsNotFound
now := time.Now()
for _, addr := range addrs { for _, addr := range addrs {
if scheme(addr) != sch { if scheme(addr) != sch {
continue continue
} }
if tm, ok := p.ignoreAddrs.Load(addr); ok && tm.(time.Time).After(now) {
continue
}
if mc, err = tr.Dial(ctx, stripScheme(addr)); err == nil { if mc, err = tr.Dial(ctx, stripScheme(addr)); err == nil {
p.ignoreAddrs.Delete(addr)
return return
} else { } else {
log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err)) log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err))
p.ignoreAddrs.Store(addr, time.Now().Add(p.ignoreTimeout))
} }
} }
return return

View file

@ -3,12 +3,6 @@ package peerservice
import ( import (
"context" "context"
"fmt" "fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/net/pool"
@ -18,6 +12,10 @@ import (
"github.com/anyproto/any-sync/net/transport/yamux" "github.com/anyproto/any-sync/net/transport/yamux"
"github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf" "github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
) )
var ctx = context.Background() var ctx = context.Background()
@ -113,22 +111,6 @@ func TestPeerService_Dial(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
assert.NotNil(t, p) assert.NotNil(t, p)
}) })
t.Run("ignore", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
fx.PreferQuic(false)
var peerId = "p1"
fx.nodeConf.EXPECT().PeerAddresses(peerId).Return([]string{"127.0.0.1:1111"}, true).AnyTimes()
fx.yamux.MockTransport.EXPECT().Dial(ctx, "127.0.0.1:1111").Return(nil, fmt.Errorf("error"))
_, err := fx.Dial(ctx, peerId)
require.Error(t, err)
_, err = fx.Dial(ctx, peerId)
require.EqualError(t, err, ErrAddrsNotFound.Error())
})
} }
func TestPeerService_Accept(t *testing.T) { func TestPeerService_Accept(t *testing.T) {