1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 17:45:03 +09:00

write deadline + check conn for close

This commit is contained in:
Sergey Cherepanov 2023-06-12 18:42:30 +02:00
parent fb1df54941
commit ab6ecaa462
No known key found for this signature in database
GPG key ID: 87F8EDE8FBDF637C
4 changed files with 31 additions and 2 deletions

View file

@ -95,6 +95,12 @@ func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
idx := len(p.inactive) - 1 idx := len(p.inactive) - 1
res := p.inactive[idx] res := p.inactive[idx]
p.inactive = p.inactive[:idx] p.inactive = p.inactive[:idx]
select {
case <-res.Closed():
p.mu.Unlock()
return p.AcquireDrpcConn(ctx)
default:
}
p.active[res] = struct{}{} p.active[res] = struct{}{}
p.mu.Unlock() p.mu.Unlock()
return res, nil return res, nil

View file

@ -29,6 +29,7 @@ func (y *yamuxConn) Open(ctx context.Context) (conn net.Conn, err error) {
if conn, err = y.Session.Open(); err != nil { if conn, err = y.Session.Open(); err != nil {
return return
} }
conn = connutil.NewTimeout(conn, time.Second*10)
return return
} }
@ -51,5 +52,6 @@ func (y *yamuxConn) Accept() (conn net.Conn, err error) {
} }
return return
} }
conn = connutil.NewTimeout(conn, time.Second*10)
return return
} }

View file

@ -106,7 +106,7 @@ func (y *yamuxTransport) Dial(ctx context.Context, addr string) (mc transport.Mu
_ = conn.Close() _ = conn.Close()
return nil, err return nil, err
} }
luc := connutil.NewLastUsageConn(conn) luc := connutil.NewLastUsageConn(connutil.NewTimeout(conn, time.Duration(y.conf.WriteTimeoutSec)*time.Second))
sess, err := yamux.Client(luc, y.yamuxConf) sess, err := yamux.Client(luc, y.yamuxConf)
if err != nil { if err != nil {
return return
@ -152,7 +152,7 @@ func (y *yamuxTransport) accept(conn net.Conn) {
log.Warn("incoming connection handshake error", zap.Error(err)) log.Warn("incoming connection handshake error", zap.Error(err))
return return
} }
luc := connutil.NewLastUsageConn(conn) luc := connutil.NewLastUsageConn(connutil.NewTimeout(conn, time.Duration(y.conf.WriteTimeoutSec)*time.Second))
sess, err := yamux.Server(luc, y.yamuxConf) sess, err := yamux.Server(luc, y.yamuxConf)
if err != nil { if err != nil {
log.Warn("incoming connection yamux session error", zap.Error(err)) log.Warn("incoming connection yamux session error", zap.Error(err))

View file

@ -66,6 +66,27 @@ func TestYamuxTransport_Dial(t *testing.T) {
assert.NoError(t, copyErr) assert.NoError(t, copyErr)
} }
func TestSubConnWrite(t *testing.T) {
fxS := newFixture(t)
defer fxS.finish(t)
fxC := newFixture(t)
defer fxC.finish(t)
mc, err := fxC.Dial(ctx, fxS.addr)
require.NoError(t, err)
mcS := <-fxS.accepter.mcs
_ = mcS
//time.Sleep(time.Second)
conn, err := mc.Open(ctx)
require.NoError(t, err)
//mc.Close()
n, err := conn.Write([]byte("123"))
require.NoError(t, err)
require.Equal(t, n, 3)
}
// no deadline - 69100 rps // no deadline - 69100 rps
// common write deadline - 66700 rps // common write deadline - 66700 rps
// subconn write deadline - 67100 rps // subconn write deadline - 67100 rps