1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-07 21:47:02 +09:00
any-sync/commonspace/space.go
2025-06-04 15:35:33 +02:00

252 lines
7.9 KiB
Go

package commonspace
import (
"context"
"strconv"
"strings"
"sync"
"time"
"go.uber.org/zap"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/acl/aclclient"
"github.com/anyproto/any-sync/commonspace/headsync"
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/kvinterfaces"
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/settings"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
syncservice "github.com/anyproto/any-sync/commonspace/sync"
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/streampool"
)
type SpaceDescription struct {
SpaceHeader *spacesyncproto.RawSpaceHeaderWithId
AclId string
AclPayload []byte
SpaceSettingsId string
SpaceSettingsPayload []byte
AclRecords []*spacesyncproto.AclRecord
}
func NewSpaceId(id string, repKey uint64) string {
return strings.Join([]string{id, strconv.FormatUint(repKey, 36)}, ".")
}
type Space interface {
Id() string
Init(ctx context.Context) error
Acl() syncacl.SyncAcl
StoredIds() []string
DebugAllHeads() []headsync.TreeHeads
Description(ctx context.Context) (desc SpaceDescription, err error)
TreeBuilder() objecttreebuilder.TreeBuilder
TreeSyncer() treesyncer.TreeSyncer
AclClient() aclclient.AclSpaceClient
SyncStatus() syncstatus.StatusUpdater
Storage() spacestorage.SpaceStorage
KeyValue() kvinterfaces.KeyValueService
DeleteTree(ctx context.Context, id string) (err error)
GetNodePeers(ctx context.Context) (peer []peer.Peer, err error)
HandleStreamSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage, stream drpc.Stream) (err error)
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
HandleMessage(ctx context.Context, msg *objectmessages.HeadUpdate) (err error)
TryClose(objectTTL time.Duration) (close bool, err error)
Close() error
}
type space struct {
mu sync.RWMutex
state *spacestate.SpaceState
app *app.App
treeBuilder objecttreebuilder.TreeBuilderComponent
treeSyncer treesyncer.TreeSyncer
peerManager peermanager.PeerManager
headSync headsync.HeadSync
syncService syncservice.SyncService
streamPool streampool.StreamPool
syncStatus syncstatus.StatusUpdater
settings settings.Settings
storage spacestorage.SpaceStorage
aclClient aclclient.AclSpaceClient
keyValue kvinterfaces.KeyValueService
aclList list.AclList
creationTime time.Time
}
func (s *space) Description(ctx context.Context) (desc SpaceDescription, err error) {
state, err := s.storage.StateStorage().GetState(ctx)
if err != nil {
return
}
s.aclList.RLock()
defer s.aclList.RUnlock()
recs, err := s.aclList.RecordsAfter(ctx, "")
if err != nil {
return
}
aclRecs := make([]*spacesyncproto.AclRecord, 0, len(recs))
for _, rec := range recs {
aclRecs = append(aclRecs, &spacesyncproto.AclRecord{
Id: rec.Id,
AclPayload: rec.Payload,
})
}
root := s.aclList.Root()
settingsStorage, err := s.storage.TreeStorage(ctx, state.SettingsId)
if err != nil {
return
}
settingsRoot, err := settingsStorage.Root(ctx)
if err != nil {
return
}
desc = SpaceDescription{
SpaceHeader: &spacesyncproto.RawSpaceHeaderWithId{
RawHeader: state.SpaceHeader,
Id: state.SpaceId,
},
AclId: root.Id,
AclPayload: root.Payload,
SpaceSettingsId: settingsRoot.Id,
SpaceSettingsPayload: settingsRoot.RawChange,
AclRecords: aclRecs,
}
return
}
func (s *space) StoredIds() []string {
return s.headSync.ExternalIds()
}
func (s *space) DebugAllHeads() (heads []headsync.TreeHeads) {
s.storage.HeadStorage().IterateEntries(context.Background(), headstorage.IterOpts{}, func(entry headstorage.HeadsEntry) (bool, error) {
if entry.CommonSnapshot != "" {
heads = append(heads, headsync.TreeHeads{
Id: entry.Id,
Heads: entry.Heads,
})
}
return true, nil
})
return heads
}
func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
return s.settings.DeleteTree(ctx, id)
}
func (s *space) HandleMessage(peerCtx context.Context, msg *objectmessages.HeadUpdate) (err error) {
return s.syncService.HandleMessage(peerCtx, msg)
}
func (s *space) HandleStreamSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage, stream drpc.Stream) (err error) {
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
return
}
objSyncReq := objectmessages.NewByteRequest(peerId, req.SpaceId, req.ObjectId, req.Payload)
return s.syncService.HandleStreamRequest(ctx, objSyncReq, stream)
}
func (s *space) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
return s.headSync.HandleRangeRequest(ctx, req)
}
func (s *space) TreeBuilder() objecttreebuilder.TreeBuilder {
return s.treeBuilder
}
func (s *space) TreeSyncer() treesyncer.TreeSyncer {
return s.treeSyncer
}
func (s *space) AclClient() aclclient.AclSpaceClient {
return s.aclClient
}
func (s *space) GetNodePeers(ctx context.Context) (peer []peer.Peer, err error) {
return s.peerManager.GetNodePeers(ctx)
}
func (s *space) Acl() syncacl.SyncAcl {
return s.aclList.(syncacl.SyncAcl)
}
func (s *space) Id() string {
return s.state.SpaceId
}
func (s *space) Init(ctx context.Context) (err error) {
s.creationTime = time.Now()
err = s.app.Start(ctx)
if err != nil {
return
}
s.treeBuilder = s.app.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent)
s.headSync = s.app.MustComponent(headsync.CName).(headsync.HeadSync)
s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater)
s.settings = s.app.MustComponent(settings.CName).(settings.Settings)
s.syncService = s.app.MustComponent(syncservice.CName).(syncservice.SyncService)
s.storage = s.app.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.peerManager = s.app.MustComponent(peermanager.CName).(peermanager.PeerManager)
s.aclList = s.app.MustComponent(syncacl.CName).(list.AclList)
s.streamPool = s.app.MustComponent(streampool.CName).(streampool.StreamPool)
s.treeSyncer = s.app.MustComponent(treesyncer.CName).(treesyncer.TreeSyncer)
s.aclClient = s.app.MustComponent(aclclient.CName).(aclclient.AclSpaceClient)
s.keyValue = s.app.MustComponent(kvinterfaces.CName).(kvinterfaces.KeyValueService)
return
}
func (s *space) SyncStatus() syncstatus.StatusUpdater {
return s.syncStatus
}
func (s *space) KeyValue() kvinterfaces.KeyValueService {
return s.keyValue
}
func (s *space) Storage() spacestorage.SpaceStorage {
return s.storage
}
func (s *space) Close() error {
if s.state.SpaceIsClosed.Swap(true) {
log.Warn("call space.Close on closed space", zap.String("id", s.state.SpaceId))
return nil
}
log := log.With(zap.String("spaceId", s.state.SpaceId))
log.Debug("space is closing")
err := s.app.Close(context.Background())
log.Debug("space closed")
return err
}
func (s *space) TryClose(objectTTL time.Duration) (close bool, err error) {
locked := s.state.TreesUsed.Load() > 1
inCacheSecs := int(time.Now().Sub(s.creationTime).Seconds())
log.With(zap.Int32("trees used", s.state.TreesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.state.SpaceId), zap.Int("in cache secs", inCacheSecs)).Debug("space lock status check")
if locked {
return false, nil
}
return true, s.Close()
}