mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
254 lines
7.9 KiB
Go
254 lines
7.9 KiB
Go
//go:generate mockgen -destination mock_objecttreebuilder/mock_objecttreebuilder.go github.com/anyproto/any-sync/commonspace/objecttreebuilder TreeBuilder
|
|
package objecttreebuilder
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync/atomic"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/anyproto/any-sync/app"
|
|
"github.com/anyproto/any-sync/app/debugstat"
|
|
"github.com/anyproto/any-sync/app/logger"
|
|
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
|
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
|
|
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
|
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
|
|
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
|
|
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
|
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
|
|
"github.com/anyproto/any-sync/commonspace/peermanager"
|
|
"github.com/anyproto/any-sync/commonspace/spacestate"
|
|
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
|
"github.com/anyproto/any-sync/commonspace/sync"
|
|
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
|
"github.com/anyproto/any-sync/nodeconf"
|
|
)
|
|
|
|
type BuildTreeOpts struct {
|
|
Listener updatelistener.UpdateListener
|
|
TreeBuilder objecttree.BuildObjectTreeFunc
|
|
TreeValidator objecttree.ValidatorFunc
|
|
}
|
|
|
|
const CName = "common.commonspace.objecttreebuilder"
|
|
|
|
var log = logger.NewNamed(CName)
|
|
var ErrSpaceClosed = errors.New("space is closed")
|
|
|
|
type HistoryTreeOpts struct {
|
|
Heads []string
|
|
Include bool
|
|
}
|
|
|
|
type TreeBuilder interface {
|
|
BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error)
|
|
BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error)
|
|
CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error)
|
|
DeriveTree(ctx context.Context, payload objecttree.ObjectTreeDerivePayload) (res treestorage.TreeStorageCreatePayload, err error)
|
|
PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error)
|
|
}
|
|
|
|
type TreeBuilderComponent interface {
|
|
app.Component
|
|
TreeBuilder
|
|
}
|
|
|
|
func New() TreeBuilderComponent {
|
|
return &treeBuilder{}
|
|
}
|
|
|
|
type treeBuilder struct {
|
|
syncClient synctree.SyncClient
|
|
configuration nodeconf.NodeConf
|
|
headsNotifiable synctree.HeadNotifiable
|
|
peerManager peermanager.PeerManager
|
|
spaceStorage spacestorage.SpaceStorage
|
|
syncStatus syncstatus.StatusUpdater
|
|
syncService sync.SyncService
|
|
|
|
log logger.CtxLogger
|
|
builder objecttree.BuildObjectTreeFunc
|
|
treeStats *synctree.TreeStatsCollector
|
|
debugStat debugstat.StatService
|
|
spaceId string
|
|
aclList list.AclList
|
|
treesUsed *atomic.Int32
|
|
isClosed *atomic.Bool
|
|
}
|
|
|
|
func (t *treeBuilder) ProvideStat() any {
|
|
return debugStat{
|
|
TreeStats: t.treeStats.Collect(),
|
|
SpaceId: t.spaceId,
|
|
}
|
|
}
|
|
|
|
func (t *treeBuilder) StatId() string {
|
|
return t.spaceId
|
|
}
|
|
|
|
func (t *treeBuilder) StatType() string {
|
|
return CName
|
|
}
|
|
|
|
func (t *treeBuilder) Run(ctx context.Context) (err error) {
|
|
t.debugStat.AddProvider(t)
|
|
return
|
|
}
|
|
|
|
func (t *treeBuilder) Close(ctx context.Context) (err error) {
|
|
t.debugStat.RemoveProvider(t)
|
|
return
|
|
}
|
|
|
|
func (t *treeBuilder) Init(a *app.App) (err error) {
|
|
state := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
|
|
comp, ok := a.Component(debugstat.CName).(debugstat.StatService)
|
|
if !ok {
|
|
comp = debugstat.NewNoOp()
|
|
}
|
|
t.treeStats = synctree.NewTreeStatsCollector(state.SpaceId)
|
|
t.debugStat = comp
|
|
t.spaceId = state.SpaceId
|
|
t.isClosed = state.SpaceIsClosed
|
|
t.treesUsed = state.TreesUsed
|
|
t.builder = state.TreeBuilderFunc
|
|
t.aclList = a.MustComponent(syncacl.CName).(syncacl.SyncAcl)
|
|
t.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
|
|
t.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
|
|
t.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater)
|
|
t.syncService = a.MustComponent(sync.CName).(sync.SyncService)
|
|
t.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
|
|
t.log = log.With(zap.String("spaceId", t.spaceId))
|
|
t.syncClient = synctree.NewSyncClient(t.spaceId, t.syncService)
|
|
return nil
|
|
}
|
|
|
|
func (t *treeBuilder) Name() (name string) {
|
|
return CName
|
|
}
|
|
|
|
func (t *treeBuilder) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (ot objecttree.ObjectTree, err error) {
|
|
if t.isClosed.Load() {
|
|
// TODO: change to real error
|
|
err = ErrSpaceClosed
|
|
return
|
|
}
|
|
treeBuilder := opts.TreeBuilder
|
|
if treeBuilder == nil {
|
|
treeBuilder = t.builder
|
|
}
|
|
deps := synctree.BuildDeps{
|
|
SpaceId: t.spaceId,
|
|
SyncClient: t.syncClient,
|
|
Configuration: t.configuration,
|
|
HeadNotifiable: t.headsNotifiable,
|
|
Listener: opts.Listener,
|
|
AclList: t.aclList,
|
|
SpaceStorage: t.spaceStorage,
|
|
OnClose: t.onClose,
|
|
SyncStatus: t.syncStatus,
|
|
PeerGetter: t.peerManager,
|
|
BuildObjectTree: treeBuilder,
|
|
ValidateObjectTree: opts.TreeValidator,
|
|
StatsCollector: t.treeStats,
|
|
}
|
|
t.treesUsed.Add(1)
|
|
t.log.Debug("incrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load()))
|
|
if ot, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil {
|
|
t.treesUsed.Add(-1)
|
|
t.log.Debug("decrementing counter, load failed", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load()), zap.Error(err))
|
|
return nil, err
|
|
}
|
|
return
|
|
}
|
|
|
|
func (t *treeBuilder) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (ot objecttree.HistoryTree, err error) {
|
|
if t.isClosed.Load() {
|
|
// TODO: change to real error
|
|
err = ErrSpaceClosed
|
|
return
|
|
}
|
|
|
|
params := objecttree.HistoryTreeParams{
|
|
AclList: t.aclList,
|
|
Heads: opts.Heads,
|
|
IncludeBeforeId: opts.Include,
|
|
}
|
|
params.Storage, err = t.spaceStorage.TreeStorage(ctx, id)
|
|
if err != nil {
|
|
return
|
|
}
|
|
return objecttree.BuildHistoryTree(params)
|
|
}
|
|
|
|
func (t *treeBuilder) CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) {
|
|
if t.isClosed.Load() {
|
|
err = ErrSpaceClosed
|
|
return
|
|
}
|
|
root, err := objecttree.CreateObjectTreeRoot(payload, t.aclList)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
res = treestorage.TreeStorageCreatePayload{
|
|
RootRawChange: root,
|
|
Changes: []*treechangeproto.RawTreeChangeWithId{root},
|
|
Heads: []string{root.Id},
|
|
}
|
|
return
|
|
}
|
|
|
|
func (t *treeBuilder) DeriveTree(ctx context.Context, payload objecttree.ObjectTreeDerivePayload) (res treestorage.TreeStorageCreatePayload, err error) {
|
|
if t.isClosed.Load() {
|
|
err = ErrSpaceClosed
|
|
return
|
|
}
|
|
root, err := objecttree.DeriveObjectTreeRoot(payload, t.aclList)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
res = treestorage.TreeStorageCreatePayload{
|
|
RootRawChange: root,
|
|
Changes: []*treechangeproto.RawTreeChangeWithId{root},
|
|
Heads: []string{root.Id},
|
|
}
|
|
return
|
|
}
|
|
|
|
func (t *treeBuilder) PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (ot objecttree.ObjectTree, err error) {
|
|
if t.isClosed.Load() {
|
|
err = ErrSpaceClosed
|
|
return
|
|
}
|
|
deps := synctree.BuildDeps{
|
|
SpaceId: t.spaceId,
|
|
SyncClient: t.syncClient,
|
|
Configuration: t.configuration,
|
|
HeadNotifiable: t.headsNotifiable,
|
|
Listener: listener,
|
|
AclList: t.aclList,
|
|
SpaceStorage: t.spaceStorage,
|
|
OnClose: t.onClose,
|
|
SyncStatus: t.syncStatus,
|
|
PeerGetter: t.peerManager,
|
|
BuildObjectTree: t.builder,
|
|
}
|
|
ot, err = synctree.PutSyncTree(ctx, payload, deps)
|
|
if err != nil {
|
|
return
|
|
}
|
|
t.treesUsed.Add(1)
|
|
t.log.Debug("incrementing counter", zap.String("id", payload.RootRawChange.Id), zap.Int32("trees", t.treesUsed.Load()))
|
|
return
|
|
}
|
|
|
|
func (t *treeBuilder) onClose(id string) {
|
|
t.treesUsed.Add(-1)
|
|
log.Debug("decrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load()), zap.String("spaceId", t.spaceId))
|
|
_ = t.syncService.CloseReceiveQueue(id)
|
|
}
|