1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 10:18:08 +09:00

Add tryclose everywhere

This commit is contained in:
mcrakhman 2023-03-09 22:52:31 +01:00
parent ca2ea8cef9
commit 43a35f4878
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
9 changed files with 48 additions and 23 deletions

View file

@ -51,6 +51,10 @@ func (p pushSpaceRequestMatcher) String() string {
type mockPeer struct{}
func (m mockPeer) TryClose() (res bool, err error) {
return true, m.Close()
}
func (m mockPeer) Id() string {
return "mockId"
}

View file

@ -3,7 +3,6 @@ package objectsync
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
@ -15,9 +14,13 @@ import (
"time"
)
type LastUsage interface {
LastUsage() time.Time
}
// MessagePool can be made generic to work with different streams
type MessagePool interface {
ocache.ObjectLastUsage
LastUsage
synchandler.SyncHandler
peermanager.PeerManager
SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)

View file

@ -6,7 +6,6 @@ import (
"time"
"github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/peermanager"
@ -20,7 +19,7 @@ import (
var log = logger.NewNamed("common.commonspace.objectsync")
type ObjectSync interface {
ocache.ObjectLastUsage
LastUsage
synchandler.SyncHandler
MessagePool() MessagePool

View file

@ -6,7 +6,6 @@ import (
"fmt"
"github.com/anytypeio/any-sync/accountservice"
"github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/headsync"
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/acl/syncacl"
@ -81,9 +80,6 @@ func NewSpaceId(id string, repKey uint64) string {
}
type Space interface {
ocache.ObjectLocker
ocache.ObjectLastUsage
Id() string
Init(ctx context.Context) error
@ -115,6 +111,7 @@ type space struct {
id string
mu sync.RWMutex
header *spacesyncproto.RawSpaceHeaderWithId
spaceTTL time.Duration
objectSync objectsync.ObjectSync
headSync headsync.HeadSync
@ -134,16 +131,6 @@ type space struct {
treesUsed *atomic.Int32
}
func (s *space) LastUsage() time.Time {
return s.objectSync.LastUsage()
}
func (s *space) Locked() bool {
locked := s.treesUsed.Load() > 1
log.With(zap.Int32("trees used", s.treesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.id)).Debug("space lock status check")
return locked
}
func (s *space) Id() string {
return s.id
}
@ -462,3 +449,15 @@ func (s *space) Close() error {
log.With(zap.String("id", s.id)).Debug("space closed")
return mError.Err()
}
func (s *space) TryClose() (close bool, err error) {
if time.Now().Sub(s.objectSync.LastUsage()) < s.spaceTTL {
return false, nil
}
locked := s.treesUsed.Load() > 1
log.With(zap.Int32("trees used", s.treesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.id)).Debug("space lock status check")
if locked {
return false, nil
}
return true, s.Close()
}

View file

@ -19,6 +19,7 @@ import (
"github.com/anytypeio/any-sync/net/pool"
"github.com/anytypeio/any-sync/nodeconf"
"sync/atomic"
"time"
)
const CName = "common.commonspace"
@ -162,6 +163,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
treesUsed: &atomic.Int32{},
isClosed: spaceIsClosed,
isDeleted: spaceIsDeleted,
spaceTTL: time.Duration(s.config.GCTTL) * time.Second,
}
return sp, nil
}

View file

@ -100,7 +100,7 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro
if err != nil {
return
}
return peer.NewPeer(sc, conn), nil
return peer.NewPeer(sc, conn, time.Minute), nil
}
func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc sec.SecureConn, err error) {

View file

@ -12,10 +12,11 @@ import (
var log = logger.NewNamed("peer")
func NewPeer(sc sec.SecureConn, conn drpc.Conn) Peer {
func NewPeer(sc sec.SecureConn, conn drpc.Conn, ttl time.Duration) Peer {
return &peer{
id: sc.RemotePeer().String(),
lastUsage: time.Now().Unix(),
ttl: ttl,
sc: sc,
Conn: conn,
}
@ -25,11 +26,13 @@ type Peer interface {
Id() string
LastUsage() time.Time
UpdateLastUsage()
TryClose() (res bool, err error)
drpc.Conn
}
type peer struct {
id string
ttl time.Duration
lastUsage int64
sc sec.SecureConn
drpc.Conn
@ -76,6 +79,13 @@ func (p *peer) UpdateLastUsage() {
atomic.StoreInt64(&p.lastUsage, time.Now().Unix())
}
func (p *peer) TryClose() (res bool, err error) {
if time.Now().Sub(p.LastUsage()) < p.ttl {
return false, nil
}
return true, p.Close()
}
func (p *peer) Close() (err error) {
log.Debug("peer close", zap.String("peerId", p.id))
return p.Conn.Close()

View file

@ -194,6 +194,10 @@ func (t *testPeer) LastUsage() time.Time {
func (t *testPeer) UpdateLastUsage() {}
func (t *testPeer) TryClose() (res bool, err error) {
return true, t.Close()
}
func (t *testPeer) Close() error {
select {
case <-t.closed:

View file

@ -103,6 +103,10 @@ type testPeer struct {
drpc.Conn
}
func (t testPeer) TryClose() (res bool, err error) {
return true, t.Close()
}
func (t testPeer) Id() string {
return t.id
}