1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 17:45:03 +09:00

wip: yamux transport + pool rewrite

This commit is contained in:
Sergey Cherepanov 2023-05-26 19:18:51 +02:00
parent f8c79c33bc
commit bc2049ccbc
No known key found for this signature in database
GPG key ID: 87F8EDE8FBDF637C
16 changed files with 610 additions and 74 deletions

View file

@ -0,0 +1,34 @@
package transport
import (
"context"
"net"
"time"
)
// Transport is a common interface for a network transport
type Transport interface {
// SetAccepter sets accepter that will be called for new connections
// this method should be called before app start
SetAccepter(accepter Accepter)
// Dial creates a new connection by given address
Dial(ctx context.Context, addr string) (mc MultiConn, err error)
}
// MultiConn is an object of multiplexing connection containing handshake info
type MultiConn interface {
// Context returns the connection context that contains handshake details
Context() context.Context
// Accept accepts new sub connections
Accept() (conn net.Conn, err error)
// Open opens new sub connection
Open(ctx context.Context) (conn net.Conn, err error)
// LastUsage returns the time of the last connection activity
LastUsage() time.Time
// Close closes the connection and all sub connections
Close() error
}
type Accepter interface {
Accept(mc MultiConn) (err error)
}

View file

@ -0,0 +1,12 @@
package yamux
type configGetter interface {
GetYamux() Config
}
type Config struct {
ListenAddrs []string `yaml:"listenAddrs"`
WriteTimeoutSec int `yaml:"writeTimeoutSec"`
DialTimeoutSec int `yaml:"dialTimeoutSec"`
MaxStreams int `yaml:"maxStreams"`
}

View file

@ -0,0 +1,27 @@
package yamux
import (
"context"
"github.com/anyproto/any-sync/net/connutil"
"github.com/hashicorp/yamux"
"net"
"time"
)
type yamuxConn struct {
ctx context.Context
luConn *connutil.LastUsageConn
*yamux.Session
}
func (y *yamuxConn) Open(ctx context.Context) (conn net.Conn, err error) {
return y.Session.Open()
}
func (y *yamuxConn) LastUsage() time.Time {
return y.luConn.LastUsage()
}
func (y *yamuxConn) Context() context.Context {
return y.ctx
}

View file

@ -0,0 +1,18 @@
//go:build !windows
package yamux
import (
"errors"
"net"
)
// isTemporary checks if an error is temporary.
func isTemporary(err error) bool {
var nErr net.Error
if errors.As(err, &nErr) {
return nErr.Temporary()
}
return false
}

View file

@ -0,0 +1,41 @@
//go:build windows
package yamux
import (
"errors"
"net"
"os"
"syscall"
)
const (
_WSAEMFILE syscall.Errno = 10024
_WSAENETRESET syscall.Errno = 10052
_WSAENOBUFS syscall.Errno = 10055
)
// isTemporary checks if an error is temporary.
// see related go issue for more detail: https://go-review.googlesource.com/c/go/+/208537/
func isTemporary(err error) bool {
var nErr net.Error
if !errors.As(err, &nErr) {
return false
}
if nErr.Temporary() {
return true
}
var sErr *os.SyscallError
if errors.As(err, &sErr) {
switch sErr.Err {
case _WSAENETRESET,
_WSAEMFILE,
_WSAENOBUFS:
return true
}
}
return false
}

View file

@ -0,0 +1,168 @@
package yamux
import (
"context"
"fmt"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/net/connutil"
"github.com/anyproto/any-sync/net/secureservice"
"github.com/anyproto/any-sync/net/transport"
"github.com/hashicorp/yamux"
"go.uber.org/zap"
"net"
"time"
)
const CName = "net.transport.yamux"
var log = logger.NewNamed(CName)
func New() Yamux {
return new(yamuxTransport)
}
// Yamux implements transport.Transport with tcp+yamux
type Yamux interface {
transport.Transport
app.ComponentRunnable
}
type yamuxTransport struct {
secure secureservice.SecureService
accepter transport.Accepter
conf Config
listeners []net.Listener
listCtx context.Context
listCtxCancel context.CancelFunc
yamuxConf *yamux.Config
}
func (y *yamuxTransport) Init(a *app.App) (err error) {
y.secure = a.MustComponent(secureservice.CName).(secureservice.SecureService)
y.conf = a.MustComponent("config").(configGetter).GetYamux()
y.yamuxConf = yamux.DefaultConfig()
if y.conf.MaxStreams > 0 {
y.yamuxConf.AcceptBacklog = y.conf.MaxStreams
}
y.yamuxConf.EnableKeepAlive = false
y.yamuxConf.StreamOpenTimeout = time.Duration(y.conf.DialTimeoutSec) * time.Second
y.yamuxConf.ConnectionWriteTimeout = time.Duration(y.conf.WriteTimeoutSec) * time.Second
return
}
func (y *yamuxTransport) Name() string {
return CName
}
func (y *yamuxTransport) Run(ctx context.Context) (err error) {
if y.accepter == nil {
return fmt.Errorf("can't run service without accepter")
}
for _, listAddr := range y.conf.ListenAddrs {
list, err := net.Listen("tcp", listAddr)
if err != nil {
return err
}
y.listeners = append(y.listeners, list)
}
y.listCtx, y.listCtxCancel = context.WithCancel(context.Background())
for _, list := range y.listeners {
go y.acceptLoop(y.listCtx, list)
}
return
}
func (y *yamuxTransport) SetAccepter(accepter transport.Accepter) {
y.accepter = accepter
}
func (y *yamuxTransport) Dial(ctx context.Context, addr string) (mc transport.MultiConn, err error) {
dialTimeout := time.Duration(y.conf.DialTimeoutSec) * time.Second
conn, err := net.DialTimeout("tcp", addr, dialTimeout)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
cctx, sc, err := y.secure.SecureOutbound(ctx, conn)
if err != nil {
_ = conn.Close()
return nil, err
}
luc := connutil.NewLastUsageConn(sc)
sess, err := yamux.Client(luc, y.yamuxConf)
if err != nil {
return
}
mc = &yamuxConn{
ctx: cctx,
luConn: luc,
Session: sess,
}
return
}
func (y *yamuxTransport) acceptLoop(ctx context.Context, list net.Listener) {
l := log.With(zap.String("localAddr", list.Addr().String()))
l.Info("yamux listener started")
defer func() {
l.Debug("yamux listener stopped")
}()
for {
conn, err := list.Accept()
if err != nil {
if isTemporary(err) {
l.Debug("listener temporary accept error", zap.Error(err))
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
continue
}
if err != net.ErrClosed {
l.Error("listener closed with error", zap.Error(err))
} else {
l.Info("listener closed")
}
return
}
go y.accept(conn)
}
}
func (y *yamuxTransport) accept(conn net.Conn) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(y.conf.DialTimeoutSec)*time.Second)
defer cancel()
cctx, sc, err := y.secure.SecureInbound(ctx, conn)
if err != nil {
log.Warn("incoming connection handshake error", zap.Error(err))
return
}
luc := connutil.NewLastUsageConn(sc)
sess, err := yamux.Server(luc, y.yamuxConf)
if err != nil {
log.Warn("incoming connection yamux session error", zap.Error(err))
return
}
mc := &yamuxConn{
ctx: cctx,
luConn: luc,
Session: sess,
}
if err = y.accepter.Accept(mc); err != nil {
log.Warn("connection accept error", zap.Error(err))
}
}
func (y *yamuxTransport) Close(ctx context.Context) (err error) {
if y.listCtxCancel != nil {
y.listCtxCancel()
}
for _, l := range y.listeners {
_ = l.Close()
}
return
}

View file

@ -0,0 +1,134 @@
package yamux
import (
"bytes"
"context"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/net/secureservice"
"github.com/anyproto/any-sync/net/transport"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/anyproto/any-sync/testutil/accounttest"
"github.com/anyproto/any-sync/testutil/testnodeconf"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io"
"testing"
)
var ctx = context.Background()
func TestYamuxTransport_Dial(t *testing.T) {
fxS := newFixture(t)
defer fxS.finish(t)
fxC := newFixture(t)
defer fxC.finish(t)
mcC, err := fxC.Dial(ctx, fxS.addr)
require.NoError(t, err)
require.Len(t, fxS.accepter.mcs, 1)
mcS := fxS.accepter.mcs[0]
var (
sData string
acceptErr error
copyErr error
done = make(chan struct{})
)
go func() {
defer close(done)
conn, serr := mcS.Accept()
if serr != nil {
acceptErr = serr
return
}
buf := bytes.NewBuffer(nil)
_, copyErr = io.Copy(buf, conn)
sData = buf.String()
return
}()
conn, err := mcC.Open(ctx)
require.NoError(t, err)
data := "some data"
_, err = conn.Write([]byte(data))
require.NoError(t, err)
require.NoError(t, conn.Close())
<-done
assert.NoError(t, acceptErr)
assert.Equal(t, data, sData)
assert.NoError(t, copyErr)
}
type fixture struct {
*yamuxTransport
a *app.App
ctrl *gomock.Controller
mockNodeConf *mock_nodeconf.MockService
acc *accounttest.AccountTestService
accepter *testAccepter
addr string
}
func newFixture(t *testing.T) *fixture {
fx := &fixture{
yamuxTransport: New().(*yamuxTransport),
ctrl: gomock.NewController(t),
acc: &accounttest.AccountTestService{},
accepter: &testAccepter{},
a: new(app.App),
}
fx.mockNodeConf = mock_nodeconf.NewMockService(fx.ctrl)
fx.mockNodeConf.EXPECT().Init(gomock.Any())
fx.mockNodeConf.EXPECT().Name().Return(nodeconf.CName).AnyTimes()
fx.mockNodeConf.EXPECT().Run(ctx)
fx.mockNodeConf.EXPECT().Close(ctx)
fx.mockNodeConf.EXPECT().NodeTypes(gomock.Any()).Return([]nodeconf.NodeType{nodeconf.NodeTypeTree}).AnyTimes()
fx.a.Register(fx.acc).Register(newTestConf()).Register(fx.mockNodeConf).Register(secureservice.New()).Register(fx.yamuxTransport).Register(fx.accepter)
require.NoError(t, fx.a.Start(ctx))
fx.addr = fx.listeners[0].Addr().String()
return fx
}
func (fx *fixture) finish(t *testing.T) {
require.NoError(t, fx.a.Close(ctx))
fx.ctrl.Finish()
}
func newTestConf() *testConf {
return &testConf{testnodeconf.GenNodeConfig(1)}
}
type testConf struct {
*testnodeconf.Config
}
func (c *testConf) GetYamux() Config {
return Config{
ListenAddrs: []string{"127.0.0.1:0"},
WriteTimeoutSec: 10,
DialTimeoutSec: 10,
MaxStreams: 1024,
}
}
type testAccepter struct {
err error
mcs []transport.MultiConn
}
func (t *testAccepter) Accept(mc transport.MultiConn) (err error) {
t.mcs = append(t.mcs, mc)
return t.err
}
func (t *testAccepter) Init(a *app.App) (err error) {
a.MustComponent(CName).(transport.Transport).SetAccepter(t)
return nil
}
func (t *testAccepter) Name() (name string) { return "testAccepter" }