From ab6ecaa462636cad2ac56f551c08c9a0820957eb Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 12 Jun 2023 18:42:30 +0200 Subject: [PATCH] write deadline + check conn for close --- net/peer/peer.go | 6 ++++++ net/transport/yamux/conn.go | 2 ++ net/transport/yamux/yamux.go | 4 ++-- net/transport/yamux/yamux_test.go | 21 +++++++++++++++++++++ 4 files changed, 31 insertions(+), 2 deletions(-) diff --git a/net/peer/peer.go b/net/peer/peer.go index 2a2c50b0..4e584f2c 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -95,6 +95,12 @@ func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) { idx := len(p.inactive) - 1 res := p.inactive[idx] p.inactive = p.inactive[:idx] + select { + case <-res.Closed(): + p.mu.Unlock() + return p.AcquireDrpcConn(ctx) + default: + } p.active[res] = struct{}{} p.mu.Unlock() return res, nil diff --git a/net/transport/yamux/conn.go b/net/transport/yamux/conn.go index c752d22d..a3406bfa 100644 --- a/net/transport/yamux/conn.go +++ b/net/transport/yamux/conn.go @@ -29,6 +29,7 @@ func (y *yamuxConn) Open(ctx context.Context) (conn net.Conn, err error) { if conn, err = y.Session.Open(); err != nil { return } + conn = connutil.NewTimeout(conn, time.Second*10) return } @@ -51,5 +52,6 @@ func (y *yamuxConn) Accept() (conn net.Conn, err error) { } return } + conn = connutil.NewTimeout(conn, time.Second*10) return } diff --git a/net/transport/yamux/yamux.go b/net/transport/yamux/yamux.go index 9a0245a9..2722a76c 100644 --- a/net/transport/yamux/yamux.go +++ b/net/transport/yamux/yamux.go @@ -106,7 +106,7 @@ func (y *yamuxTransport) Dial(ctx context.Context, addr string) (mc transport.Mu _ = conn.Close() return nil, err } - luc := connutil.NewLastUsageConn(conn) + luc := connutil.NewLastUsageConn(connutil.NewTimeout(conn, time.Duration(y.conf.WriteTimeoutSec)*time.Second)) sess, err := yamux.Client(luc, y.yamuxConf) if err != nil { return @@ -152,7 +152,7 @@ func (y *yamuxTransport) accept(conn net.Conn) { log.Warn("incoming connection handshake error", zap.Error(err)) return } - luc := connutil.NewLastUsageConn(conn) + luc := connutil.NewLastUsageConn(connutil.NewTimeout(conn, time.Duration(y.conf.WriteTimeoutSec)*time.Second)) sess, err := yamux.Server(luc, y.yamuxConf) if err != nil { log.Warn("incoming connection yamux session error", zap.Error(err)) diff --git a/net/transport/yamux/yamux_test.go b/net/transport/yamux/yamux_test.go index 02e1c322..8946c969 100644 --- a/net/transport/yamux/yamux_test.go +++ b/net/transport/yamux/yamux_test.go @@ -66,6 +66,27 @@ func TestYamuxTransport_Dial(t *testing.T) { assert.NoError(t, copyErr) } +func TestSubConnWrite(t *testing.T) { + fxS := newFixture(t) + defer fxS.finish(t) + fxC := newFixture(t) + defer fxC.finish(t) + mc, err := fxC.Dial(ctx, fxS.addr) + require.NoError(t, err) + + mcS := <-fxS.accepter.mcs + _ = mcS + + //time.Sleep(time.Second) + conn, err := mc.Open(ctx) + require.NoError(t, err) + //mc.Close() + + n, err := conn.Write([]byte("123")) + require.NoError(t, err) + require.Equal(t, n, 3) +} + // no deadline - 69100 rps // common write deadline - 66700 rps // subconn write deadline - 67100 rps