mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-07 21:47:02 +09:00
252 lines
7.9 KiB
Go
252 lines
7.9 KiB
Go
package commonspace
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
"storj.io/drpc"
|
|
|
|
"github.com/anyproto/any-sync/app"
|
|
"github.com/anyproto/any-sync/commonspace/acl/aclclient"
|
|
"github.com/anyproto/any-sync/commonspace/headsync"
|
|
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
|
|
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
|
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
|
|
"github.com/anyproto/any-sync/commonspace/object/keyvalue/kvinterfaces"
|
|
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
|
|
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
|
|
"github.com/anyproto/any-sync/commonspace/peermanager"
|
|
"github.com/anyproto/any-sync/commonspace/settings"
|
|
"github.com/anyproto/any-sync/commonspace/spacestate"
|
|
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
|
syncservice "github.com/anyproto/any-sync/commonspace/sync"
|
|
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
|
|
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
|
"github.com/anyproto/any-sync/net/peer"
|
|
"github.com/anyproto/any-sync/net/streampool"
|
|
)
|
|
|
|
type SpaceDescription struct {
|
|
SpaceHeader *spacesyncproto.RawSpaceHeaderWithId
|
|
AclId string
|
|
AclPayload []byte
|
|
SpaceSettingsId string
|
|
SpaceSettingsPayload []byte
|
|
AclRecords []*spacesyncproto.AclRecord
|
|
}
|
|
|
|
func NewSpaceId(id string, repKey uint64) string {
|
|
return strings.Join([]string{id, strconv.FormatUint(repKey, 36)}, ".")
|
|
}
|
|
|
|
type Space interface {
|
|
Id() string
|
|
Init(ctx context.Context) error
|
|
Acl() syncacl.SyncAcl
|
|
|
|
StoredIds() []string
|
|
DebugAllHeads() []headsync.TreeHeads
|
|
Description(ctx context.Context) (desc SpaceDescription, err error)
|
|
|
|
TreeBuilder() objecttreebuilder.TreeBuilder
|
|
TreeSyncer() treesyncer.TreeSyncer
|
|
AclClient() aclclient.AclSpaceClient
|
|
SyncStatus() syncstatus.StatusUpdater
|
|
Storage() spacestorage.SpaceStorage
|
|
KeyValue() kvinterfaces.KeyValueService
|
|
|
|
DeleteTree(ctx context.Context, id string) (err error)
|
|
GetNodePeers(ctx context.Context) (peer []peer.Peer, err error)
|
|
|
|
HandleStreamSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage, stream drpc.Stream) (err error)
|
|
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
|
|
HandleMessage(ctx context.Context, msg *objectmessages.HeadUpdate) (err error)
|
|
|
|
TryClose(objectTTL time.Duration) (close bool, err error)
|
|
Close() error
|
|
}
|
|
|
|
type space struct {
|
|
mu sync.RWMutex
|
|
state *spacestate.SpaceState
|
|
app *app.App
|
|
|
|
treeBuilder objecttreebuilder.TreeBuilderComponent
|
|
treeSyncer treesyncer.TreeSyncer
|
|
peerManager peermanager.PeerManager
|
|
headSync headsync.HeadSync
|
|
syncService syncservice.SyncService
|
|
streamPool streampool.StreamPool
|
|
syncStatus syncstatus.StatusUpdater
|
|
settings settings.Settings
|
|
storage spacestorage.SpaceStorage
|
|
aclClient aclclient.AclSpaceClient
|
|
keyValue kvinterfaces.KeyValueService
|
|
aclList list.AclList
|
|
creationTime time.Time
|
|
}
|
|
|
|
func (s *space) Description(ctx context.Context) (desc SpaceDescription, err error) {
|
|
state, err := s.storage.StateStorage().GetState(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
s.aclList.RLock()
|
|
defer s.aclList.RUnlock()
|
|
recs, err := s.aclList.RecordsAfter(ctx, "")
|
|
if err != nil {
|
|
return
|
|
}
|
|
aclRecs := make([]*spacesyncproto.AclRecord, 0, len(recs))
|
|
for _, rec := range recs {
|
|
aclRecs = append(aclRecs, &spacesyncproto.AclRecord{
|
|
Id: rec.Id,
|
|
AclPayload: rec.Payload,
|
|
})
|
|
}
|
|
root := s.aclList.Root()
|
|
settingsStorage, err := s.storage.TreeStorage(ctx, state.SettingsId)
|
|
if err != nil {
|
|
return
|
|
}
|
|
settingsRoot, err := settingsStorage.Root(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
desc = SpaceDescription{
|
|
SpaceHeader: &spacesyncproto.RawSpaceHeaderWithId{
|
|
RawHeader: state.SpaceHeader,
|
|
Id: state.SpaceId,
|
|
},
|
|
AclId: root.Id,
|
|
AclPayload: root.Payload,
|
|
SpaceSettingsId: settingsRoot.Id,
|
|
SpaceSettingsPayload: settingsRoot.RawChange,
|
|
AclRecords: aclRecs,
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *space) StoredIds() []string {
|
|
return s.headSync.ExternalIds()
|
|
}
|
|
|
|
func (s *space) DebugAllHeads() (heads []headsync.TreeHeads) {
|
|
s.storage.HeadStorage().IterateEntries(context.Background(), headstorage.IterOpts{}, func(entry headstorage.HeadsEntry) (bool, error) {
|
|
if entry.CommonSnapshot != "" {
|
|
heads = append(heads, headsync.TreeHeads{
|
|
Id: entry.Id,
|
|
Heads: entry.Heads,
|
|
})
|
|
}
|
|
return true, nil
|
|
})
|
|
return heads
|
|
}
|
|
|
|
func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
|
|
return s.settings.DeleteTree(ctx, id)
|
|
}
|
|
|
|
func (s *space) HandleMessage(peerCtx context.Context, msg *objectmessages.HeadUpdate) (err error) {
|
|
return s.syncService.HandleMessage(peerCtx, msg)
|
|
}
|
|
|
|
func (s *space) HandleStreamSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage, stream drpc.Stream) (err error) {
|
|
peerId, err := peer.CtxPeerId(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
objSyncReq := objectmessages.NewByteRequest(peerId, req.SpaceId, req.ObjectId, req.Payload)
|
|
return s.syncService.HandleStreamRequest(ctx, objSyncReq, stream)
|
|
}
|
|
|
|
func (s *space) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
|
|
return s.headSync.HandleRangeRequest(ctx, req)
|
|
}
|
|
|
|
func (s *space) TreeBuilder() objecttreebuilder.TreeBuilder {
|
|
return s.treeBuilder
|
|
}
|
|
|
|
func (s *space) TreeSyncer() treesyncer.TreeSyncer {
|
|
return s.treeSyncer
|
|
}
|
|
|
|
func (s *space) AclClient() aclclient.AclSpaceClient {
|
|
return s.aclClient
|
|
}
|
|
|
|
func (s *space) GetNodePeers(ctx context.Context) (peer []peer.Peer, err error) {
|
|
return s.peerManager.GetNodePeers(ctx)
|
|
}
|
|
|
|
func (s *space) Acl() syncacl.SyncAcl {
|
|
return s.aclList.(syncacl.SyncAcl)
|
|
}
|
|
|
|
func (s *space) Id() string {
|
|
return s.state.SpaceId
|
|
}
|
|
|
|
func (s *space) Init(ctx context.Context) (err error) {
|
|
s.creationTime = time.Now()
|
|
err = s.app.Start(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
s.treeBuilder = s.app.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent)
|
|
s.headSync = s.app.MustComponent(headsync.CName).(headsync.HeadSync)
|
|
s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater)
|
|
s.settings = s.app.MustComponent(settings.CName).(settings.Settings)
|
|
s.syncService = s.app.MustComponent(syncservice.CName).(syncservice.SyncService)
|
|
s.storage = s.app.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
|
|
s.peerManager = s.app.MustComponent(peermanager.CName).(peermanager.PeerManager)
|
|
s.aclList = s.app.MustComponent(syncacl.CName).(list.AclList)
|
|
s.streamPool = s.app.MustComponent(streampool.CName).(streampool.StreamPool)
|
|
s.treeSyncer = s.app.MustComponent(treesyncer.CName).(treesyncer.TreeSyncer)
|
|
s.aclClient = s.app.MustComponent(aclclient.CName).(aclclient.AclSpaceClient)
|
|
s.keyValue = s.app.MustComponent(kvinterfaces.CName).(kvinterfaces.KeyValueService)
|
|
return
|
|
}
|
|
|
|
func (s *space) SyncStatus() syncstatus.StatusUpdater {
|
|
return s.syncStatus
|
|
}
|
|
|
|
func (s *space) KeyValue() kvinterfaces.KeyValueService {
|
|
return s.keyValue
|
|
}
|
|
|
|
func (s *space) Storage() spacestorage.SpaceStorage {
|
|
return s.storage
|
|
}
|
|
|
|
func (s *space) Close() error {
|
|
if s.state.SpaceIsClosed.Swap(true) {
|
|
log.Warn("call space.Close on closed space", zap.String("id", s.state.SpaceId))
|
|
return nil
|
|
}
|
|
log := log.With(zap.String("spaceId", s.state.SpaceId))
|
|
log.Debug("space is closing")
|
|
|
|
err := s.app.Close(context.Background())
|
|
log.Debug("space closed")
|
|
return err
|
|
}
|
|
|
|
func (s *space) TryClose(objectTTL time.Duration) (close bool, err error) {
|
|
locked := s.state.TreesUsed.Load() > 1
|
|
inCacheSecs := int(time.Now().Sub(s.creationTime).Seconds())
|
|
log.With(zap.Int32("trees used", s.state.TreesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.state.SpaceId), zap.Int("in cache secs", inCacheSecs)).Debug("space lock status check")
|
|
if locked {
|
|
return false, nil
|
|
}
|
|
return true, s.Close()
|
|
}
|