1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 09:35:03 +09:00

Merge branch 'main' of github.com:anyproto/any-sync into quic

This commit is contained in:
Sergey Cherepanov 2023-07-18 17:15:32 +02:00
commit f00bcfbc78
No known key found for this signature in database
GPG key ID: 87F8EDE8FBDF637C
168 changed files with 15993 additions and 6679 deletions

View file

@ -8,10 +8,9 @@ import (
context "context"
net "net"
reflect "reflect"
time "time"
transport "github.com/anyproto/any-sync/net/transport"
gomock "github.com/golang/mock/gomock"
gomock "go.uber.org/mock/gomock"
)
// MockTransport is a mock of Transport interface.
@ -158,20 +157,6 @@ func (mr *MockMultiConnMockRecorder) IsClosed() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsClosed", reflect.TypeOf((*MockMultiConn)(nil).IsClosed))
}
// LastUsage mocks base method.
func (m *MockMultiConn) LastUsage() time.Time {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LastUsage")
ret0, _ := ret[0].(time.Time)
return ret0
}
// LastUsage indicates an expected call of LastUsage.
func (mr *MockMultiConnMockRecorder) LastUsage() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastUsage", reflect.TypeOf((*MockMultiConn)(nil).LastUsage))
}
// Open mocks base method.
func (m *MockMultiConn) Open(arg0 context.Context) (net.Conn, error) {
m.ctrl.T.Helper()

View file

@ -5,7 +5,6 @@ import (
"context"
"errors"
"net"
"time"
)
var (
@ -29,8 +28,6 @@ type MultiConn interface {
Accept() (conn net.Conn, err error)
// Open opens new sub connection
Open(ctx context.Context) (conn net.Conn, err error)
// LastUsage returns the time of the last connection activity
LastUsage() time.Time
// Addr returns remote peer address
Addr() string
// IsClosed returns true when connection is closed

View file

@ -5,7 +5,8 @@ type configGetter interface {
}
type Config struct {
ListenAddrs []string `yaml:"listenAddrs"`
WriteTimeoutSec int `yaml:"writeTimeoutSec"`
DialTimeoutSec int `yaml:"dialTimeoutSec"`
ListenAddrs []string `yaml:"listenAddrs"`
WriteTimeoutSec int `yaml:"writeTimeoutSec"`
DialTimeoutSec int `yaml:"dialTimeoutSec"`
KeepAlivePeriodSec int `yaml:"keepAlivePeriodSec"`
}

View file

@ -3,13 +3,16 @@ package yamux
import (
"context"
"github.com/anyproto/any-sync/net/connutil"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/transport"
"github.com/hashicorp/yamux"
"io"
"net"
"time"
)
func NewMultiConn(cctx context.Context, luConn *connutil.LastUsageConn, addr string, sess *yamux.Session) transport.MultiConn {
cctx = peer.CtxWithPeerAddr(cctx, sess.RemoteAddr().String())
return &yamuxConn{
ctx: cctx,
luConn: luConn,
@ -46,7 +49,7 @@ func (y *yamuxConn) Addr() string {
func (y *yamuxConn) Accept() (conn net.Conn, err error) {
if conn, err = y.Session.Accept(); err != nil {
if err == yamux.ErrSessionShutdown {
if err == yamux.ErrSessionShutdown || err == io.EOF {
err = transport.ErrConnClosed
}
return

View file

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/yamux"
"go.uber.org/zap"
"net"
"sync"
"time"
)
@ -25,6 +26,7 @@ func New() Yamux {
// Yamux implements transport.Transport with tcp+yamux
type Yamux interface {
transport.Transport
AddListener(lis net.Listener)
app.ComponentRunnable
}
@ -37,15 +39,31 @@ type yamuxTransport struct {
listCtx context.Context
listCtxCancel context.CancelFunc
yamuxConf *yamux.Config
mu sync.Mutex
}
func (y *yamuxTransport) Init(a *app.App) (err error) {
y.secure = a.MustComponent(secureservice.CName).(secureservice.SecureService)
y.conf = a.MustComponent("config").(configGetter).GetYamux()
if y.conf.DialTimeoutSec <= 0 {
y.conf.DialTimeoutSec = 10
}
if y.conf.WriteTimeoutSec <= 0 {
y.conf.WriteTimeoutSec = 10
}
y.yamuxConf = yamux.DefaultConfig()
y.yamuxConf.EnableKeepAlive = false
if y.conf.KeepAlivePeriodSec < 0 {
y.yamuxConf.EnableKeepAlive = false
} else {
y.yamuxConf.EnableKeepAlive = true
if y.conf.KeepAlivePeriodSec != 0 {
y.yamuxConf.KeepAliveInterval = time.Duration(y.conf.KeepAlivePeriodSec) * time.Second
}
}
y.yamuxConf.StreamOpenTimeout = time.Duration(y.conf.DialTimeoutSec) * time.Second
y.yamuxConf.ConnectionWriteTimeout = time.Duration(y.conf.WriteTimeoutSec) * time.Second
y.listCtx, y.listCtxCancel = context.WithCancel(context.Background())
return
}
@ -57,6 +75,8 @@ func (y *yamuxTransport) Run(ctx context.Context) (err error) {
if y.accepter == nil {
return fmt.Errorf("can't run service without accepter")
}
y.mu.Lock()
defer y.mu.Unlock()
for _, listAddr := range y.conf.ListenAddrs {
list, err := net.Listen("tcp", listAddr)
if err != nil {
@ -64,7 +84,6 @@ func (y *yamuxTransport) Run(ctx context.Context) (err error) {
}
y.listeners = append(y.listeners, list)
}
y.listCtx, y.listCtxCancel = context.WithCancel(context.Background())
for _, list := range y.listeners {
go y.acceptLoop(y.listCtx, list)
}
@ -75,6 +94,13 @@ func (y *yamuxTransport) SetAccepter(accepter transport.Accepter) {
y.accepter = accepter
}
func (y *yamuxTransport) AddListener(lis net.Listener) {
y.mu.Lock()
defer y.mu.Unlock()
y.listeners = append(y.listeners, lis)
go y.acceptLoop(y.listCtx, lis)
}
func (y *yamuxTransport) Dial(ctx context.Context, addr string) (mc transport.MultiConn, err error) {
dialTimeout := time.Duration(y.conf.DialTimeoutSec) * time.Second
conn, err := net.DialTimeout("tcp", addr, dialTimeout)
@ -88,7 +114,7 @@ func (y *yamuxTransport) Dial(ctx context.Context, addr string) (mc transport.Mu
_ = conn.Close()
return nil, err
}
luc := connutil.NewLastUsageConn(conn)
luc := connutil.NewLastUsageConn(connutil.NewTimeout(conn, time.Duration(y.conf.WriteTimeoutSec)*time.Second))
sess, err := yamux.Client(luc, y.yamuxConf)
if err != nil {
return
@ -134,7 +160,7 @@ func (y *yamuxTransport) accept(conn net.Conn) {
log.Warn("incoming connection handshake error", zap.Error(err))
return
}
luc := connutil.NewLastUsageConn(conn)
luc := connutil.NewLastUsageConn(connutil.NewTimeout(conn, time.Duration(y.conf.WriteTimeoutSec)*time.Second))
sess, err := yamux.Server(luc, y.yamuxConf)
if err != nil {
log.Warn("incoming connection yamux session error", zap.Error(err))

View file

@ -10,9 +10,9 @@ import (
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/anyproto/any-sync/testutil/accounttest"
"github.com/anyproto/any-sync/testutil/testnodeconf"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"io"
"net"
"sync"
@ -30,8 +30,12 @@ func TestYamuxTransport_Dial(t *testing.T) {
mcC, err := fxC.Dial(ctx, fxS.addr)
require.NoError(t, err)
require.Len(t, fxS.accepter.mcs, 1)
mcS := <-fxS.accepter.mcs
var mcS transport.MultiConn
select {
case mcS = <-fxS.accepter.mcs:
case <-time.After(time.Second * 5):
require.True(t, false, "timeout")
}
var (
sData string
@ -69,11 +73,11 @@ func TestYamuxTransport_Dial(t *testing.T) {
// no deadline - 69100 rps
// common write deadline - 66700 rps
// subconn write deadline - 67100 rps
func TestWriteBench(t *testing.T) {
func TestWriteBenchReuse(t *testing.T) {
t.Skip()
var (
numSubConn = 10
numWrites = 100000
numWrites = 10000
)
fxS := newFixture(t)
@ -124,6 +128,63 @@ func TestWriteBench(t *testing.T) {
t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds())
}
func TestWriteBenchNew(t *testing.T) {
t.Skip()
var (
numSubConn = 10
numWrites = 10000
)
fxS := newFixture(t)
defer fxS.finish(t)
fxC := newFixture(t)
defer fxC.finish(t)
mcC, err := fxC.Dial(ctx, fxS.addr)
require.NoError(t, err)
mcS := <-fxS.accepter.mcs
go func() {
for i := 0; i < numSubConn; i++ {
require.NoError(t, err)
go func() {
var b = make([]byte, 1024)
for {
conn, _ := mcS.Accept()
n, _ := conn.Read(b)
if n > 0 {
conn.Write(b[:n])
} else {
_ = conn.Close()
break
}
conn.Close()
}
}()
}
}()
var wg sync.WaitGroup
wg.Add(numSubConn)
st := time.Now()
for i := 0; i < numSubConn; i++ {
go func() {
defer wg.Done()
for j := 0; j < numWrites; j++ {
sc, err := mcC.Open(ctx)
require.NoError(t, err)
var b = []byte("some data some data some data some data some data some data some data some data some data")
sc.Write(b)
sc.Read(b)
sc.Close()
}
}()
}
wg.Wait()
dur := time.Since(st)
t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds())
}
type fixture struct {
*yamuxTransport
a *app.App