mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
peer service
This commit is contained in:
parent
bc2049ccbc
commit
7958b43da6
9 changed files with 167 additions and 236 deletions
|
@ -1,137 +0,0 @@
|
||||||
package dialer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"github.com/anyproto/any-sync/app"
|
|
||||||
"github.com/anyproto/any-sync/app/logger"
|
|
||||||
net2 "github.com/anyproto/any-sync/net"
|
|
||||||
"github.com/anyproto/any-sync/net/connutil"
|
|
||||||
"github.com/anyproto/any-sync/net/peer"
|
|
||||||
"github.com/anyproto/any-sync/net/secureservice"
|
|
||||||
"github.com/anyproto/any-sync/net/secureservice/handshake"
|
|
||||||
"github.com/anyproto/any-sync/nodeconf"
|
|
||||||
"github.com/libp2p/go-libp2p/core/sec"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"net"
|
|
||||||
"storj.io/drpc"
|
|
||||||
"storj.io/drpc/drpcconn"
|
|
||||||
"storj.io/drpc/drpcmanager"
|
|
||||||
"storj.io/drpc/drpcwire"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
const CName = "common.net.dialer"
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrAddrsNotFound = errors.New("addrs for peer not found")
|
|
||||||
ErrPeerIdIsUnexpected = errors.New("expected to connect with other peer id")
|
|
||||||
)
|
|
||||||
|
|
||||||
var log = logger.NewNamed(CName)
|
|
||||||
|
|
||||||
func New() Dialer {
|
|
||||||
return &dialer{peerAddrs: map[string][]string{}}
|
|
||||||
}
|
|
||||||
|
|
||||||
type Dialer interface {
|
|
||||||
Dial(ctx context.Context, peerId string) (peer peer.Peer, err error)
|
|
||||||
SetPeerAddrs(peerId string, addrs []string)
|
|
||||||
app.Component
|
|
||||||
}
|
|
||||||
|
|
||||||
type dialer struct {
|
|
||||||
transport secureservice.SecureService
|
|
||||||
config net2.Config
|
|
||||||
nodeConf nodeconf.NodeConf
|
|
||||||
peerAddrs map[string][]string
|
|
||||||
|
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *dialer) Init(a *app.App) (err error) {
|
|
||||||
d.transport = a.MustComponent(secureservice.CName).(secureservice.SecureService)
|
|
||||||
d.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
|
|
||||||
d.config = a.MustComponent("config").(net2.ConfigGetter).GetNet()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *dialer) Name() (name string) {
|
|
||||||
return CName
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *dialer) SetPeerAddrs(peerId string, addrs []string) {
|
|
||||||
d.mu.Lock()
|
|
||||||
defer d.mu.Unlock()
|
|
||||||
d.peerAddrs[peerId] = addrs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *dialer) getPeerAddrs(peerId string) ([]string, error) {
|
|
||||||
if addrs, ok := d.nodeConf.PeerAddresses(peerId); ok {
|
|
||||||
return addrs, nil
|
|
||||||
}
|
|
||||||
addrs, ok := d.peerAddrs[peerId]
|
|
||||||
if !ok || len(addrs) == 0 {
|
|
||||||
return nil, ErrAddrsNotFound
|
|
||||||
}
|
|
||||||
return addrs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err error) {
|
|
||||||
var ctxCancel context.CancelFunc
|
|
||||||
ctx, ctxCancel = context.WithTimeout(ctx, time.Second*10)
|
|
||||||
defer ctxCancel()
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
|
|
||||||
addrs, err := d.getPeerAddrs(peerId)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
conn drpc.Conn
|
|
||||||
sc sec.SecureConn
|
|
||||||
)
|
|
||||||
log.InfoCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
|
|
||||||
for _, addr := range addrs {
|
|
||||||
conn, sc, err = d.handshake(ctx, addr, peerId)
|
|
||||||
if err != nil {
|
|
||||||
log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err))
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return peer.NewPeer(sc, conn), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *dialer) handshake(ctx context.Context, addr, peerId string) (conn drpc.Conn, sc sec.SecureConn, err error) {
|
|
||||||
st := time.Now()
|
|
||||||
// TODO: move dial timeout to config
|
|
||||||
tcpConn, err := net.DialTimeout("tcp", addr, time.Second*3)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("dialTimeout error: %v; since start: %v", err, time.Since(st))
|
|
||||||
}
|
|
||||||
|
|
||||||
timeoutConn := connutil.NewConn(tcpConn, time.Millisecond*time.Duration(d.config.Stream.TimeoutMilliseconds))
|
|
||||||
sc, err = d.transport.SecureOutbound(ctx, timeoutConn)
|
|
||||||
if err != nil {
|
|
||||||
if he, ok := err.(handshake.HandshakeError); ok {
|
|
||||||
return nil, nil, he
|
|
||||||
}
|
|
||||||
return nil, nil, fmt.Errorf("tls handshaeke error: %v; since start: %v", err, time.Since(st))
|
|
||||||
}
|
|
||||||
if peerId != sc.RemotePeer().String() {
|
|
||||||
return nil, nil, ErrPeerIdIsUnexpected
|
|
||||||
}
|
|
||||||
log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("addr", addr))
|
|
||||||
conn = drpcconn.NewWithOptions(sc, drpcconn.Options{Manager: drpcmanager.Options{
|
|
||||||
Reader: drpcwire.ReaderOptions{MaximumBufferSize: d.config.Stream.MaxMsgSizeMb * (1 << 20)},
|
|
||||||
}})
|
|
||||||
return conn, sc, err
|
|
||||||
}
|
|
|
@ -1,85 +1,39 @@
|
||||||
package peer
|
package peer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"github.com/anyproto/any-sync/net/transport"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/anyproto/any-sync/app/logger"
|
"github.com/anyproto/any-sync/app/logger"
|
||||||
"github.com/libp2p/go-libp2p/core/sec"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"storj.io/drpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logger.NewNamed("common.net.peer")
|
var log = logger.NewNamed("common.net.peer")
|
||||||
|
|
||||||
func NewPeer(sc sec.SecureConn, conn drpc.Conn) Peer {
|
func NewPeer(mc transport.MultiConn) (p Peer, err error) {
|
||||||
return &peer{
|
ctx := mc.Context()
|
||||||
id: sc.RemotePeer().String(),
|
pr := &peer{}
|
||||||
lastUsage: time.Now().Unix(),
|
if pr.id, err = CtxPeerId(ctx); err != nil {
|
||||||
sc: sc,
|
return
|
||||||
Conn: conn,
|
|
||||||
}
|
}
|
||||||
|
return pr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type Peer interface {
|
type Peer interface {
|
||||||
Id() string
|
Id() string
|
||||||
LastUsage() time.Time
|
|
||||||
UpdateLastUsage()
|
|
||||||
Addr() string
|
|
||||||
TryClose(objectTTL time.Duration) (res bool, err error)
|
TryClose(objectTTL time.Duration) (res bool, err error)
|
||||||
drpc.Conn
|
transport.MultiConn
|
||||||
}
|
}
|
||||||
|
|
||||||
type peer struct {
|
type peer struct {
|
||||||
id string
|
id string
|
||||||
ttl time.Duration
|
transport.MultiConn
|
||||||
lastUsage int64
|
|
||||||
sc sec.SecureConn
|
|
||||||
drpc.Conn
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) Id() string {
|
func (p *peer) Id() string {
|
||||||
return p.id
|
return p.id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) LastUsage() time.Time {
|
|
||||||
select {
|
|
||||||
case <-p.Closed():
|
|
||||||
return time.Unix(0, 0)
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return time.Unix(atomic.LoadInt64(&p.lastUsage), 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *peer) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
|
|
||||||
defer p.UpdateLastUsage()
|
|
||||||
return p.Conn.Invoke(ctx, rpc, enc, in, out)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *peer) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
|
|
||||||
defer p.UpdateLastUsage()
|
|
||||||
return p.Conn.NewStream(ctx, rpc, enc)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *peer) Read(b []byte) (n int, err error) {
|
|
||||||
if n, err = p.sc.Read(b); err != nil {
|
|
||||||
p.UpdateLastUsage()
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *peer) Write(b []byte) (n int, err error) {
|
|
||||||
if n, err = p.sc.Write(b); err != nil {
|
|
||||||
p.UpdateLastUsage()
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *peer) UpdateLastUsage() {
|
|
||||||
atomic.StoreInt64(&p.lastUsage, time.Now().Unix())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) {
|
func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) {
|
||||||
if time.Now().Sub(p.LastUsage()) < objectTTL {
|
if time.Now().Sub(p.LastUsage()) < objectTTL {
|
||||||
return false, nil
|
return false, nil
|
||||||
|
@ -87,14 +41,7 @@ func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) {
|
||||||
return true, p.Close()
|
return true, p.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) Addr() string {
|
|
||||||
if p.sc != nil {
|
|
||||||
return p.sc.RemoteAddr().String()
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *peer) Close() (err error) {
|
func (p *peer) Close() (err error) {
|
||||||
log.Debug("peer close", zap.String("peerId", p.id))
|
log.Debug("peer close", zap.String("peerId", p.id))
|
||||||
return p.Conn.Close()
|
return p.MultiConn.Close()
|
||||||
}
|
}
|
||||||
|
|
107
net/peerservice/peerservice.go
Normal file
107
net/peerservice/peerservice.go
Normal file
|
@ -0,0 +1,107 @@
|
||||||
|
package peerservice
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"github.com/anyproto/any-sync/app"
|
||||||
|
"github.com/anyproto/any-sync/app/logger"
|
||||||
|
"github.com/anyproto/any-sync/net/peer"
|
||||||
|
"github.com/anyproto/any-sync/net/pool"
|
||||||
|
"github.com/anyproto/any-sync/net/transport"
|
||||||
|
"github.com/anyproto/any-sync/net/transport/yamux"
|
||||||
|
"github.com/anyproto/any-sync/nodeconf"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
const CName = "net.peerservice"
|
||||||
|
|
||||||
|
var log = logger.NewNamed(CName)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrAddrsNotFound = errors.New("addrs for peer not found")
|
||||||
|
)
|
||||||
|
|
||||||
|
func New() PeerService {
|
||||||
|
return new(peerService)
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeerService interface {
|
||||||
|
Dial(ctx context.Context, peerId string) (pr peer.Peer, err error)
|
||||||
|
SetPeerAddrs(peerId string, addrs []string)
|
||||||
|
transport.Accepter
|
||||||
|
app.Component
|
||||||
|
}
|
||||||
|
|
||||||
|
type peerService struct {
|
||||||
|
yamux transport.Transport
|
||||||
|
nodeConf nodeconf.NodeConf
|
||||||
|
peerAddrs map[string][]string
|
||||||
|
pool pool.Pool
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *peerService) Init(a *app.App) (err error) {
|
||||||
|
p.yamux = a.MustComponent(yamux.CName).(transport.Transport)
|
||||||
|
p.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
|
||||||
|
p.pool = a.MustComponent(pool.CName).(pool.Pool)
|
||||||
|
p.peerAddrs = map[string][]string{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *peerService) Name() (name string) {
|
||||||
|
return CName
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *peerService) Dial(ctx context.Context, peerId string) (pr peer.Peer, err error) {
|
||||||
|
p.mu.RLock()
|
||||||
|
defer p.mu.RUnlock()
|
||||||
|
|
||||||
|
addrs, err := p.getPeerAddrs(peerId)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var mc transport.MultiConn
|
||||||
|
log.InfoCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
|
||||||
|
for _, addr := range addrs {
|
||||||
|
mc, err = p.yamux.Dial(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return peer.NewPeer(mc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *peerService) Accept(mc transport.MultiConn) (err error) {
|
||||||
|
pr, err := peer.NewPeer(mc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = p.pool.AddPeer(pr); err != nil {
|
||||||
|
_ = pr.Close()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *peerService) SetPeerAddrs(peerId string, addrs []string) {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
p.peerAddrs[peerId] = addrs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *peerService) getPeerAddrs(peerId string) ([]string, error) {
|
||||||
|
if addrs, ok := p.nodeConf.PeerAddresses(peerId); ok {
|
||||||
|
return addrs, nil
|
||||||
|
}
|
||||||
|
addrs, ok := p.peerAddrs[peerId]
|
||||||
|
if !ok || len(addrs) == 0 {
|
||||||
|
return nil, ErrAddrsNotFound
|
||||||
|
}
|
||||||
|
return addrs, nil
|
||||||
|
}
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/anyproto/any-sync/app/ocache"
|
"github.com/anyproto/any-sync/app/ocache"
|
||||||
"github.com/anyproto/any-sync/net"
|
"github.com/anyproto/any-sync/net"
|
||||||
"github.com/anyproto/any-sync/net/dialer"
|
|
||||||
"github.com/anyproto/any-sync/net/peer"
|
"github.com/anyproto/any-sync/net/peer"
|
||||||
"github.com/anyproto/any-sync/net/secureservice/handshake"
|
"github.com/anyproto/any-sync/net/secureservice/handshake"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -18,13 +17,12 @@ type Pool interface {
|
||||||
// GetOneOf searches at least one existing connection in outgoing or creates a new one from a randomly selected id from given list
|
// GetOneOf searches at least one existing connection in outgoing or creates a new one from a randomly selected id from given list
|
||||||
GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error)
|
GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error)
|
||||||
// AddPeer adds incoming peer to the pool
|
// AddPeer adds incoming peer to the pool
|
||||||
AddPeer(ctx context.Context, p peer.Peer) (err error)
|
AddPeer(p peer.Peer) (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type pool struct {
|
type pool struct {
|
||||||
outgoing ocache.OCache
|
outgoing ocache.OCache
|
||||||
incoming ocache.OCache
|
incoming ocache.OCache
|
||||||
dialer dialer.Dialer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) Name() (name string) {
|
func (p *pool) Name() (name string) {
|
||||||
|
@ -46,36 +44,26 @@ func (p *pool) get(ctx context.Context, source ocache.OCache, id string) (peer.P
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
pr := v.(peer.Peer)
|
pr := v.(peer.Peer)
|
||||||
select {
|
if !pr.IsClosed() {
|
||||||
case <-pr.Closed():
|
|
||||||
default:
|
|
||||||
return pr, nil
|
return pr, nil
|
||||||
}
|
}
|
||||||
_, _ = source.Remove(ctx, id)
|
_, _ = source.Remove(ctx, id)
|
||||||
return p.Get(ctx, id)
|
return p.Get(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) Dial(ctx context.Context, id string) (peer.Peer, error) {
|
|
||||||
return p.dialer.Dial(ctx, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
|
func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
|
||||||
// finding existing connection
|
// finding existing connection
|
||||||
for _, peerId := range peerIds {
|
for _, peerId := range peerIds {
|
||||||
if v, err := p.incoming.Pick(ctx, peerId); err == nil {
|
if v, err := p.incoming.Pick(ctx, peerId); err == nil {
|
||||||
pr := v.(peer.Peer)
|
pr := v.(peer.Peer)
|
||||||
select {
|
if !pr.IsClosed() {
|
||||||
case <-pr.Closed():
|
|
||||||
default:
|
|
||||||
return pr, nil
|
return pr, nil
|
||||||
}
|
}
|
||||||
_, _ = p.incoming.Remove(ctx, peerId)
|
_, _ = p.incoming.Remove(ctx, peerId)
|
||||||
}
|
}
|
||||||
if v, err := p.outgoing.Pick(ctx, peerId); err == nil {
|
if v, err := p.outgoing.Pick(ctx, peerId); err == nil {
|
||||||
pr := v.(peer.Peer)
|
pr := v.(peer.Peer)
|
||||||
select {
|
if !pr.IsClosed() {
|
||||||
case <-pr.Closed():
|
|
||||||
default:
|
|
||||||
return pr, nil
|
return pr, nil
|
||||||
}
|
}
|
||||||
_, _ = p.outgoing.Remove(ctx, peerId)
|
_, _ = p.outgoing.Remove(ctx, peerId)
|
||||||
|
@ -101,6 +89,6 @@ func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error
|
||||||
return nil, lastErr
|
return nil, lastErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) AddPeer(ctx context.Context, pr peer.Peer) (err error) {
|
func (p *pool) AddPeer(pr peer.Peer) (err error) {
|
||||||
return p.incoming.Add(pr.Id(), pr)
|
return p.incoming.Add(pr.Id(), pr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,12 +6,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/anyproto/any-sync/app"
|
"github.com/anyproto/any-sync/app"
|
||||||
"github.com/anyproto/any-sync/net"
|
"github.com/anyproto/any-sync/net"
|
||||||
"github.com/anyproto/any-sync/net/dialer"
|
|
||||||
"github.com/anyproto/any-sync/net/peer"
|
"github.com/anyproto/any-sync/net/peer"
|
||||||
"github.com/anyproto/any-sync/net/secureservice/handshake"
|
"github.com/anyproto/any-sync/net/secureservice/handshake"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"storj.io/drpc"
|
net2 "net"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -158,7 +157,7 @@ type fixture struct {
|
||||||
t *testing.T
|
t *testing.T
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ dialer.Dialer = (*dialerMock)(nil)
|
var _ dialer = (*dialerMock)(nil)
|
||||||
|
|
||||||
type dialerMock struct {
|
type dialerMock struct {
|
||||||
dial func(ctx context.Context, peerId string) (peer peer.Peer, err error)
|
dial func(ctx context.Context, peerId string) (peer peer.Peer, err error)
|
||||||
|
@ -181,7 +180,7 @@ func (d *dialerMock) Init(a *app.App) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dialerMock) Name() (name string) {
|
func (d *dialerMock) Name() (name string) {
|
||||||
return dialer.CName
|
return "net.peerservice"
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestPeer(id string) *testPeer {
|
func newTestPeer(id string) *testPeer {
|
||||||
|
@ -196,6 +195,21 @@ type testPeer struct {
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *testPeer) Context() context.Context {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testPeer) Accept() (conn net2.Conn, err error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testPeer) Open(ctx context.Context) (conn net2.Conn, err error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
func (t *testPeer) Addr() string {
|
func (t *testPeer) Addr() string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
@ -224,14 +238,11 @@ func (t *testPeer) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testPeer) Closed() <-chan struct{} {
|
func (t *testPeer) IsClosed() bool {
|
||||||
return t.closed
|
select {
|
||||||
}
|
case <-t.closed:
|
||||||
|
return true
|
||||||
func (t *testPeer) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
|
default:
|
||||||
return fmt.Errorf("call Invoke on test peer")
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testPeer) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
|
|
||||||
return nil, fmt.Errorf("call NewStream on test peer")
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"github.com/anyproto/any-sync/app/logger"
|
"github.com/anyproto/any-sync/app/logger"
|
||||||
"github.com/anyproto/any-sync/app/ocache"
|
"github.com/anyproto/any-sync/app/ocache"
|
||||||
"github.com/anyproto/any-sync/metric"
|
"github.com/anyproto/any-sync/metric"
|
||||||
"github.com/anyproto/any-sync/net/dialer"
|
"github.com/anyproto/any-sync/net/peer"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"time"
|
"time"
|
||||||
|
@ -27,16 +27,20 @@ type Service interface {
|
||||||
app.ComponentRunnable
|
app.ComponentRunnable
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type dialer interface {
|
||||||
|
Dial(ctx context.Context, peerId string) (pr peer.Peer, err error)
|
||||||
|
}
|
||||||
|
|
||||||
type poolService struct {
|
type poolService struct {
|
||||||
// default pool
|
// default pool
|
||||||
*pool
|
*pool
|
||||||
dialer dialer.Dialer
|
dialer dialer
|
||||||
metricReg *prometheus.Registry
|
metricReg *prometheus.Registry
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *poolService) Init(a *app.App) (err error) {
|
func (p *poolService) Init(a *app.App) (err error) {
|
||||||
p.dialer = a.MustComponent(dialer.CName).(dialer.Dialer)
|
p.dialer = a.MustComponent("net.peerservice").(dialer)
|
||||||
p.pool = &pool{dialer: p.dialer}
|
p.pool = &pool{}
|
||||||
if m := a.Component(metric.CName); m != nil {
|
if m := a.Component(metric.CName); m != nil {
|
||||||
p.metricReg = m.(metric.Metric).Registry()
|
p.metricReg = m.(metric.Metric).Registry()
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,10 @@ type MultiConn interface {
|
||||||
Open(ctx context.Context) (conn net.Conn, err error)
|
Open(ctx context.Context) (conn net.Conn, err error)
|
||||||
// LastUsage returns the time of the last connection activity
|
// LastUsage returns the time of the last connection activity
|
||||||
LastUsage() time.Time
|
LastUsage() time.Time
|
||||||
|
// Addr returns remote peer address
|
||||||
|
Addr() string
|
||||||
|
// IsClosed returns true when connection is closed
|
||||||
|
IsClosed() bool
|
||||||
// Close closes the connection and all sub connections
|
// Close closes the connection and all sub connections
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
type yamuxConn struct {
|
type yamuxConn struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
luConn *connutil.LastUsageConn
|
luConn *connutil.LastUsageConn
|
||||||
|
addr string
|
||||||
*yamux.Session
|
*yamux.Session
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,3 +26,7 @@ func (y *yamuxConn) LastUsage() time.Time {
|
||||||
func (y *yamuxConn) Context() context.Context {
|
func (y *yamuxConn) Context() context.Context {
|
||||||
return y.ctx
|
return y.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (y *yamuxConn) Addr() string {
|
||||||
|
return y.addr
|
||||||
|
}
|
||||||
|
|
|
@ -100,6 +100,7 @@ func (y *yamuxTransport) Dial(ctx context.Context, addr string) (mc transport.Mu
|
||||||
ctx: cctx,
|
ctx: cctx,
|
||||||
luConn: luc,
|
luConn: luc,
|
||||||
Session: sess,
|
Session: sess,
|
||||||
|
addr: addr,
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -151,6 +152,7 @@ func (y *yamuxTransport) accept(conn net.Conn) {
|
||||||
ctx: cctx,
|
ctx: cctx,
|
||||||
luConn: luc,
|
luConn: luc,
|
||||||
Session: sess,
|
Session: sess,
|
||||||
|
addr: conn.RemoteAddr().String(),
|
||||||
}
|
}
|
||||||
if err = y.accepter.Accept(mc); err != nil {
|
if err = y.accepter.Accept(mc); err != nil {
|
||||||
log.Warn("connection accept error", zap.Error(err))
|
log.Warn("connection accept error", zap.Error(err))
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue