From 43a35f48787d695938e825f3e17b61d14123a5b0 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 9 Mar 2023 22:52:31 +0100 Subject: [PATCH] Add tryclose everywhere --- commonspace/headsync/diffsyncer_test.go | 4 +++ commonspace/objectsync/msgpool.go | 7 ++++-- commonspace/objectsync/objectsync.go | 3 +-- commonspace/space.go | 33 ++++++++++++------------- commonspace/spaceservice.go | 2 ++ net/dialer/dialer.go | 2 +- net/peer/peer.go | 12 ++++++++- net/pool/pool_test.go | 4 +++ net/rpc/rpctest/pool.go | 4 +++ 9 files changed, 48 insertions(+), 23 deletions(-) diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index f9622ca6..2b163a19 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -51,6 +51,10 @@ func (p pushSpaceRequestMatcher) String() string { type mockPeer struct{} +func (m mockPeer) TryClose() (res bool, err error) { + return true, m.Close() +} + func (m mockPeer) Id() string { return "mockId" } diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index 3dd14a2e..533efc7e 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -3,7 +3,6 @@ package objectsync import ( "context" "fmt" - "github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -15,9 +14,13 @@ import ( "time" ) +type LastUsage interface { + LastUsage() time.Time +} + // MessagePool can be made generic to work with different streams type MessagePool interface { - ocache.ObjectLastUsage + LastUsage synchandler.SyncHandler peermanager.PeerManager SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 85c587d9..74b3f7fa 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -6,7 +6,6 @@ import ( "time" "github.com/anytypeio/any-sync/app/logger" - "github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/peermanager" @@ -20,7 +19,7 @@ import ( var log = logger.NewNamed("common.commonspace.objectsync") type ObjectSync interface { - ocache.ObjectLastUsage + LastUsage synchandler.SyncHandler MessagePool() MessagePool diff --git a/commonspace/space.go b/commonspace/space.go index d18c9e8a..00b38a8d 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/anytypeio/any-sync/accountservice" "github.com/anytypeio/any-sync/app/logger" - "github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/commonspace/headsync" "github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/acl/syncacl" @@ -81,9 +80,6 @@ func NewSpaceId(id string, repKey uint64) string { } type Space interface { - ocache.ObjectLocker - ocache.ObjectLastUsage - Id() string Init(ctx context.Context) error @@ -112,9 +108,10 @@ type Space interface { } type space struct { - id string - mu sync.RWMutex - header *spacesyncproto.RawSpaceHeaderWithId + id string + mu sync.RWMutex + header *spacesyncproto.RawSpaceHeaderWithId + spaceTTL time.Duration objectSync objectsync.ObjectSync headSync headsync.HeadSync @@ -134,16 +131,6 @@ type space struct { treesUsed *atomic.Int32 } -func (s *space) LastUsage() time.Time { - return s.objectSync.LastUsage() -} - -func (s *space) Locked() bool { - locked := s.treesUsed.Load() > 1 - log.With(zap.Int32("trees used", s.treesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.id)).Debug("space lock status check") - return locked -} - func (s *space) Id() string { return s.id } @@ -462,3 +449,15 @@ func (s *space) Close() error { log.With(zap.String("id", s.id)).Debug("space closed") return mError.Err() } + +func (s *space) TryClose() (close bool, err error) { + if time.Now().Sub(s.objectSync.LastUsage()) < s.spaceTTL { + return false, nil + } + locked := s.treesUsed.Load() > 1 + log.With(zap.Int32("trees used", s.treesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.id)).Debug("space lock status check") + if locked { + return false, nil + } + return true, s.Close() +} diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 89e9283b..0e086923 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -19,6 +19,7 @@ import ( "github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/nodeconf" "sync/atomic" + "time" ) const CName = "common.commonspace" @@ -162,6 +163,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { treesUsed: &atomic.Int32{}, isClosed: spaceIsClosed, isDeleted: spaceIsDeleted, + spaceTTL: time.Duration(s.config.GCTTL) * time.Second, } return sp, nil } diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go index 7252a4f3..64b0c191 100644 --- a/net/dialer/dialer.go +++ b/net/dialer/dialer.go @@ -100,7 +100,7 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro if err != nil { return } - return peer.NewPeer(sc, conn), nil + return peer.NewPeer(sc, conn, time.Minute), nil } func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc sec.SecureConn, err error) { diff --git a/net/peer/peer.go b/net/peer/peer.go index 9c9f547d..bc9353d0 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -12,10 +12,11 @@ import ( var log = logger.NewNamed("peer") -func NewPeer(sc sec.SecureConn, conn drpc.Conn) Peer { +func NewPeer(sc sec.SecureConn, conn drpc.Conn, ttl time.Duration) Peer { return &peer{ id: sc.RemotePeer().String(), lastUsage: time.Now().Unix(), + ttl: ttl, sc: sc, Conn: conn, } @@ -25,11 +26,13 @@ type Peer interface { Id() string LastUsage() time.Time UpdateLastUsage() + TryClose() (res bool, err error) drpc.Conn } type peer struct { id string + ttl time.Duration lastUsage int64 sc sec.SecureConn drpc.Conn @@ -76,6 +79,13 @@ func (p *peer) UpdateLastUsage() { atomic.StoreInt64(&p.lastUsage, time.Now().Unix()) } +func (p *peer) TryClose() (res bool, err error) { + if time.Now().Sub(p.LastUsage()) < p.ttl { + return false, nil + } + return true, p.Close() +} + func (p *peer) Close() (err error) { log.Debug("peer close", zap.String("peerId", p.id)) return p.Conn.Close() diff --git a/net/pool/pool_test.go b/net/pool/pool_test.go index f913333c..8ae08e3b 100644 --- a/net/pool/pool_test.go +++ b/net/pool/pool_test.go @@ -194,6 +194,10 @@ func (t *testPeer) LastUsage() time.Time { func (t *testPeer) UpdateLastUsage() {} +func (t *testPeer) TryClose() (res bool, err error) { + return true, t.Close() +} + func (t *testPeer) Close() error { select { case <-t.closed: diff --git a/net/rpc/rpctest/pool.go b/net/rpc/rpctest/pool.go index 7fdbdda4..40b7bb1d 100644 --- a/net/rpc/rpctest/pool.go +++ b/net/rpc/rpctest/pool.go @@ -103,6 +103,10 @@ type testPeer struct { drpc.Conn } +func (t testPeer) TryClose() (res bool, err error) { + return true, t.Close() +} + func (t testPeer) Id() string { return t.id }