1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00
any-sync/net/peer/peer_test.go
2023-08-09 15:56:26 +02:00

299 lines
7.3 KiB
Go

package peer
import (
"context"
"github.com/anyproto/any-sync/net/rpc"
"github.com/anyproto/any-sync/net/secureservice/handshake"
"github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto"
"github.com/anyproto/any-sync/net/transport/mock_transport"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"io"
"net"
_ "net/http/pprof"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"testing"
"time"
)
var ctx = context.Background()
func TestPeer_AcquireDrpcConn(t *testing.T) {
t.Run("generic", func(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
in, out := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
}()
defer out.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
dc, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
assert.NotEmpty(t, dc)
defer dc.Close()
assert.Len(t, fx.active, 1)
assert.Len(t, fx.inactive, 0)
fx.ReleaseDrpcConn(dc)
assert.Len(t, fx.active, 0)
assert.Len(t, fx.inactive, 1)
dc, err = fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
assert.NotEmpty(t, dc)
assert.Len(t, fx.active, 1)
assert.Len(t, fx.inactive, 0)
})
t.Run("closed sub conn", func(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
closedIn, _ := net.Pipe()
dc := drpcconn.New(closedIn)
fx.ReleaseDrpcConn(&subConn{Conn: dc})
dc.Close()
in, out := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
}()
defer out.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
_, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
})
}
func TestPeer_DrpcConn_OpenThrottling(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
acquire := func() (func(), drpc.Conn, error) {
in, out := net.Pipe()
go func() {
_, err := handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
require.NoError(t, err)
}()
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
dconn, err := fx.AcquireDrpcConn(ctx)
return func() { out.Close() }, dconn, err
}
var conCount = fx.limiter.startThreshold + 3
var conns []drpc.Conn
for i := 0; i < conCount; i++ {
cc, dc, err := acquire()
require.NoError(t, err)
defer cc()
conns = append(conns, dc)
}
go func() {
time.Sleep(fx.limiter.slowDownStep)
fx.ReleaseDrpcConn(conns[0])
conns = conns[1:]
}()
_, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
}
func TestPeerAccept(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
in, out := net.Pipe()
defer out.Close()
var outHandshakeCh = make(chan error)
go func() {
outHandshakeCh <- handshake.OutgoingProtoHandshake(ctx, out, handshakeproto.ProtoType_DRPC)
}()
fx.acceptCh <- acceptedConn{conn: in}
cn := <-fx.testCtrl.serveConn
assert.Equal(t, in, cn)
assert.NoError(t, <-outHandshakeCh)
}
func TestPeer_DrpcConn_AcceptThrottling(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
var conCount = fx.limiter.startThreshold + 3
for i := 0; i < conCount; i++ {
in, out := net.Pipe()
defer out.Close()
var outHandshakeCh = make(chan error)
go func() {
outHandshakeCh <- handshake.OutgoingProtoHandshake(ctx, out, handshakeproto.ProtoType_DRPC)
}()
fx.acceptCh <- acceptedConn{conn: in}
cn := <-fx.testCtrl.serveConn
assert.Equal(t, in, cn)
assert.NoError(t, <-outHandshakeCh)
}
}
func TestPeer_TryClose(t *testing.T) {
t.Run("not close in first minute", func(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
res, err := fx.TryClose(time.Second)
require.NoError(t, err)
assert.False(t, res)
})
t.Run("close", func(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
fx.peer.created = fx.peer.created.Add(-time.Minute * 2)
res, err := fx.TryClose(time.Second)
require.NoError(t, err)
assert.True(t, res)
})
t.Run("custom ttl", func(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
fx.peer.created = fx.peer.created.Add(-time.Minute * 2)
fx.peer.SetTTL(time.Hour)
in, out := net.Pipe()
hsDone := make(chan struct{})
go func() {
defer close(hsDone)
handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
}()
defer out.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
fx.mc.EXPECT().Addr().AnyTimes()
_, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
time.Sleep(time.Second)
res, err := fx.TryClose(time.Second / 2)
require.NoError(t, err)
assert.False(t, res)
})
t.Run("gc", func(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
//now := time.Now()
// make one inactive
in, out := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
}()
defer out.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
dc, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
// make one active but closed
in2, out2 := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out2, defaultProtoChecker)
}()
defer out2.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in2, nil)
fx.mc.EXPECT().Addr().Return("")
dc2, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
_ = dc2.Close()
// make one inactive and closed
in3, out3 := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out3, defaultProtoChecker)
}()
defer out3.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in3, nil)
dc3, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
// make another active, should be removed by ttl
in4, out4 := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out4, defaultProtoChecker)
}()
defer out4.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in4, nil)
dc4, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
defer dc4.Close()
fx.ReleaseDrpcConn(dc3)
_ = dc3.Close()
fx.ReleaseDrpcConn(dc)
time.Sleep(time.Millisecond * 100)
res, err := fx.TryClose(time.Millisecond * 50)
require.NoError(t, err)
assert.False(t, res)
})
}
type acceptedConn struct {
conn net.Conn
err error
}
func newFixture(t *testing.T, peerId string) *fixture {
fx := &fixture{
ctrl: gomock.NewController(t),
acceptCh: make(chan acceptedConn),
testCtrl: newTesCtrl(),
}
fx.mc = mock_transport.NewMockMultiConn(fx.ctrl)
ctx := CtxWithPeerId(context.Background(), peerId)
fx.mc.EXPECT().Context().Return(ctx).AnyTimes()
fx.mc.EXPECT().Accept().DoAndReturn(func() (net.Conn, error) {
ac := <-fx.acceptCh
return ac.conn, ac.err
}).AnyTimes()
fx.mc.EXPECT().Close().AnyTimes()
p, err := NewPeer(fx.mc, fx.testCtrl)
require.NoError(t, err)
fx.peer = p.(*peer)
return fx
}
type fixture struct {
*peer
ctrl *gomock.Controller
mc *mock_transport.MockMultiConn
acceptCh chan acceptedConn
testCtrl *testCtrl
}
func (fx *fixture) finish() {
fx.testCtrl.close()
fx.ctrl.Finish()
}
func newTesCtrl() *testCtrl {
return &testCtrl{closeCh: make(chan struct{}), serveConn: make(chan net.Conn, 10)}
}
type testCtrl struct {
serveConn chan net.Conn
closeCh chan struct{}
}
func (t *testCtrl) DrpcConfig() rpc.Config {
return rpc.Config{Stream: rpc.StreamConfig{MaxMsgSizeMb: 10}}
}
func (t *testCtrl) ServeConn(ctx context.Context, conn net.Conn) (err error) {
t.serveConn <- conn
<-t.closeCh
return io.EOF
}
func (t *testCtrl) close() {
close(t.closeCh)
}