1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

peer.SetTTL method

This commit is contained in:
Sergey Cherepanov 2023-08-09 15:33:49 +02:00
parent 0fd5e0d271
commit 7250d309ea
No known key found for this signature in database
GPG key ID: 87F8EDE8FBDF637C
5 changed files with 57 additions and 4 deletions

View file

@ -58,6 +58,10 @@ func (p pushSpaceRequestMatcher) String() string {
type mockPeer struct {
}
func (m mockPeer) SetTTL(ttl time.Duration) {
return
}
func (m mockPeer) Id() string {
return "peerId"
}

View file

@ -133,6 +133,18 @@ func (mr *MockPeerMockRecorder) ReleaseDrpcConn(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReleaseDrpcConn", reflect.TypeOf((*MockPeer)(nil).ReleaseDrpcConn), arg0)
}
// SetTTL mocks base method.
func (m *MockPeer) SetTTL(arg0 time.Duration) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetTTL", arg0)
}
// SetTTL indicates an expected call of SetTTL.
func (mr *MockPeerMockRecorder) SetTTL(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTTL", reflect.TypeOf((*MockPeer)(nil).SetTTL), arg0)
}
// TryClose mocks base method.
func (m *MockPeer) TryClose(arg0 time.Duration) (bool, error) {
m.ctrl.T.Helper()

View file

@ -63,6 +63,9 @@ type Peer interface {
IsClosed() bool
// SetTTL overrides the default pool ttl
SetTTL(ttl time.Duration)
TryClose(objectTTL time.Duration) (res bool, err error)
ocache.Object
@ -90,10 +93,11 @@ type peer struct {
acceptCtxCancel context.CancelFunc
ttl atomic.Uint32
limiter limiter
mu sync.Mutex
mu sync.Mutex
created time.Time
transport.MultiConn
}
@ -183,10 +187,10 @@ func (p *peer) openDrpcConn(ctx context.Context) (dconn *subConn, err error) {
if err != nil {
return nil, err
}
if err = handshake.OutgoingProtoHandshake(ctx, conn, handshakeproto.ProtoType_DRPC); err != nil {
tconn := connutil.NewLastUsageConn(conn)
if err = handshake.OutgoingProtoHandshake(ctx, tconn, handshakeproto.ProtoType_DRPC); err != nil {
return nil, err
}
tconn := connutil.NewLastUsageConn(conn)
bufSize := p.ctrl.DrpcConfig().Stream.MaxMsgSizeMb * (1 << 20)
return &subConn{
Conn: drpcconn.NewWithOptions(tconn, drpcconn.Options{
@ -247,7 +251,14 @@ func (p *peer) serve(conn net.Conn) (err error) {
return p.ctrl.ServeConn(p.Context(), conn)
}
func (p *peer) SetTTL(ttl time.Duration) {
p.ttl.Store(uint32(ttl.Seconds()))
}
func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) {
if ttl := p.ttl.Load(); ttl > 0 {
objectTTL = time.Duration(ttl) * time.Second
}
aliveCount := p.gc(objectTTL)
if aliveCount == 0 && p.created.Add(time.Minute).Before(time.Now()) {
return true, p.Close()

View file

@ -155,6 +155,28 @@ func TestPeer_TryClose(t *testing.T) {
require.NoError(t, err)
assert.True(t, res)
})
t.Run("custom ttl", func(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
fx.peer.created = fx.peer.created.Add(-time.Minute * 2)
fx.peer.SetTTL(time.Hour)
in, out := net.Pipe()
hsDone := make(chan struct{})
go func() {
defer close(hsDone)
handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
}()
defer out.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
fx.mc.EXPECT().Addr().AnyTimes()
_, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
time.Sleep(time.Second)
res, err := fx.TryClose(time.Second / 2)
require.NoError(t, err)
assert.False(t, res)
})
t.Run("gc", func(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()

View file

@ -217,6 +217,10 @@ type testPeer struct {
closed chan struct{}
}
func (t *testPeer) SetTTL(ttl time.Duration) {
return
}
func (t *testPeer) DoDrpc(ctx context.Context, do func(conn drpc.Conn) error) error {
return fmt.Errorf("not implemented")
}