From b0fa43fb14ea9f0d8afa33e69cacb9ed8b16ad07 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 1 Jun 2023 10:28:32 +0200 Subject: [PATCH] WIP work on components --- app/app.go | 51 ++- app/app_test.go | 14 + commonspace/commongetter.go | 41 +- commonspace/{ => config}/config.go | 2 +- commonspace/deletionstate/deletionstate.go | 150 ++++++++ commonspace/headsync/diffsyncer.go | 27 +- commonspace/headsync/headsync.go | 166 ++++---- commonspace/headsync/util.go | 27 ++ commonspace/object/tree/synctree/synctree.go | 14 +- .../object/tree/synctree/synctree_test.go | 7 +- .../object/tree/synctree/synctreehandler.go | 14 +- .../tree/synctree/synctreehandler_test.go | 24 +- .../object/tree/synctree/treeremotegetter.go | 2 +- .../object/tree/synctree/utils_test.go | 22 +- commonspace/objectsync/msgpool.go | 142 ------- commonspace/objectsync/objectsync.go | 204 +++++----- commonspace/objectsync/syncclient.go | 78 ---- .../{ => syncclient}/requestfactory.go | 2 +- .../objectsync/syncclient/syncclient.go | 98 +++++ commonspace/objecttreebuilder/treebuilder.go | 191 ++++++++++ commonspace/requestsender/requestsender.go | 45 +++ commonspace/settings/deleter.go | 6 +- commonspace/settings/deletionmanager.go | 5 +- commonspace/settings/settings.go | 354 +++--------------- commonspace/settings/settingsobject.go | 329 ++++++++++++++++ ...ettings_test.go => settingsobject_test.go} | 0 .../settings/settingsstate/settingsstate.go | 1 + commonspace/space.go | 2 +- commonspace/spaceservice.go | 8 +- commonspace/spacestate/shareddata.go | 35 ++ commonspace/spaceutils_test.go | 5 +- commonspace/streamsender/streamsender.go | 14 + commonspace/syncstatus/noop.go | 36 +- commonspace/syncstatus/syncstatus.go | 65 ++-- 34 files changed, 1343 insertions(+), 838 deletions(-) rename commonspace/{ => config}/config.go (91%) create mode 100644 commonspace/deletionstate/deletionstate.go create mode 100644 commonspace/headsync/util.go delete mode 100644 commonspace/objectsync/msgpool.go delete mode 100644 commonspace/objectsync/syncclient.go rename commonspace/objectsync/{ => syncclient}/requestfactory.go (99%) create mode 100644 commonspace/objectsync/syncclient/syncclient.go create mode 100644 commonspace/objecttreebuilder/treebuilder.go create mode 100644 commonspace/requestsender/requestsender.go create mode 100644 commonspace/settings/settingsobject.go rename commonspace/settings/{settings_test.go => settingsobject_test.go} (100%) create mode 100644 commonspace/spacestate/shareddata.go create mode 100644 commonspace/streamsender/streamsender.go diff --git a/app/app.go b/app/app.go index a290a82b..b8be83a0 100644 --- a/app/app.go +++ b/app/app.go @@ -55,6 +55,7 @@ type ComponentStatable interface { // App is the central part of the application // It contains and manages all components type App struct { + parent *App components []Component mu sync.RWMutex startStat Stat @@ -109,15 +110,29 @@ func VersionDescription() string { return fmt.Sprintf("build on %s from %s at #%s(%s)", BuildDate, GitBranch, GitCommit, GitState) } +// ChildApp creates a child container which has access to parent's components +// It doesn't call Start on any of the parent's components +func (app *App) ChildApp() *App { + return &App{ + parent: app, + deviceState: app.deviceState, + anySyncVersion: app.AnySyncVersion(), + } +} + // Register adds service to registry // All components will be started in the order they were registered func (app *App) Register(s Component) *App { app.mu.Lock() defer app.mu.Unlock() - for _, es := range app.components { - if s.Name() == es.Name() { - panic(fmt.Errorf("component '%s' already registered", s.Name())) + current := app + for current != nil { + for _, es := range current.components { + if s.Name() == es.Name() { + panic(fmt.Errorf("component '%s' already registered", s.Name())) + } } + current = current.parent } app.components = append(app.components, s) return app @@ -128,10 +143,14 @@ func (app *App) Register(s Component) *App { func (app *App) Component(name string) Component { app.mu.RLock() defer app.mu.RUnlock() - for _, s := range app.components { - if s.Name() == name { - return s + current := app + for current != nil { + for _, s := range current.components { + if s.Name() == name { + return s + } } + current = current.parent } return nil } @@ -149,10 +168,14 @@ func (app *App) MustComponent(name string) Component { func MustComponent[i any](app *App) i { app.mu.RLock() defer app.mu.RUnlock() - for _, s := range app.components { - if v, ok := s.(i); ok { - return v + current := app + for current != nil { + for _, s := range current.components { + if v, ok := s.(i); ok { + return v + } } + current = current.parent } empty := new(i) panic(fmt.Errorf("component with interface %T is not found", empty)) @@ -162,9 +185,13 @@ func MustComponent[i any](app *App) i { func (app *App) ComponentNames() (names []string) { app.mu.RLock() defer app.mu.RUnlock() - names = make([]string, len(app.components)) - for i, c := range app.components { - names[i] = c.Name() + names = make([]string, 0, len(app.components)) + current := app + for current != nil { + for _, c := range current.components { + names = append(names, c.Name()) + } + current = current.parent } return } diff --git a/app/app_test.go b/app/app_test.go index cdc52445..7b5678d6 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -34,6 +34,20 @@ func TestAppServiceRegistry(t *testing.T) { names := app.ComponentNames() assert.Equal(t, names, []string{"c1", "r1", "s1"}) }) + t.Run("Child MustComponent", func(t *testing.T) { + app := app.ChildApp() + app.Register(newTestService(testTypeComponent, "x1", nil, nil)) + for _, name := range []string{"c1", "r1", "s1", "x1"} { + assert.NotPanics(t, func() { app.MustComponent(name) }, name) + } + assert.Panics(t, func() { app.MustComponent("not-registered") }) + }) + t.Run("Child ComponentNames", func(t *testing.T) { + app := app.ChildApp() + app.Register(newTestService(testTypeComponent, "x1", nil, nil)) + names := app.ComponentNames() + assert.Equal(t, names, []string{"x1", "c1", "r1", "s1"}) + }) } func TestAppStart(t *testing.T) { diff --git a/commonspace/commongetter.go b/commonspace/commongetter.go index 80476a14..93c9312d 100644 --- a/commonspace/commongetter.go +++ b/commonspace/commongetter.go @@ -2,32 +2,53 @@ package commonspace import ( "context" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/object/syncobjectgetter" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/treemanager" + "github.com/anyproto/any-sync/commonspace/spacestate" "sync/atomic" ) -type commonGetter struct { +type ObjectManager interface { + treemanager.TreeManager + AddObject(object syncobjectgetter.SyncObject) + GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) +} + +type objectManager struct { treemanager.TreeManager spaceId string reservedObjects []syncobjectgetter.SyncObject spaceIsClosed *atomic.Bool } -func newCommonGetter(spaceId string, getter treemanager.TreeManager, spaceIsClosed *atomic.Bool) *commonGetter { - return &commonGetter{ - TreeManager: getter, - spaceId: spaceId, - spaceIsClosed: spaceIsClosed, +func NewObjectManager(manager treemanager.TreeManager) ObjectManager { + return &objectManager{ + TreeManager: manager, } } -func (c *commonGetter) AddObject(object syncobjectgetter.SyncObject) { +func (c *objectManager) Init(a *app.App) (err error) { + state := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) + c.spaceId = state.SpaceId + c.spaceIsClosed = state.SpaceIsClosed + return nil +} + +func (c *objectManager) Run(ctx context.Context) (err error) { + return nil +} + +func (c *objectManager) Close(ctx context.Context) (err error) { + return nil +} + +func (c *objectManager) AddObject(object syncobjectgetter.SyncObject) { c.reservedObjects = append(c.reservedObjects, object) } -func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) { +func (c *objectManager) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) { if c.spaceIsClosed.Load() { return nil, ErrSpaceClosed } @@ -37,7 +58,7 @@ func (c *commonGetter) GetTree(ctx context.Context, spaceId, treeId string) (obj return c.TreeManager.GetTree(ctx, spaceId, treeId) } -func (c *commonGetter) getReservedObject(id string) syncobjectgetter.SyncObject { +func (c *objectManager) getReservedObject(id string) syncobjectgetter.SyncObject { for _, obj := range c.reservedObjects { if obj != nil && obj.Id() == id { return obj @@ -46,7 +67,7 @@ func (c *commonGetter) getReservedObject(id string) syncobjectgetter.SyncObject return nil } -func (c *commonGetter) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) { +func (c *objectManager) GetObject(ctx context.Context, objectId string) (obj syncobjectgetter.SyncObject, err error) { if c.spaceIsClosed.Load() { return nil, ErrSpaceClosed } diff --git a/commonspace/config.go b/commonspace/config/config.go similarity index 91% rename from commonspace/config.go rename to commonspace/config/config.go index e5485068..cce4b548 100644 --- a/commonspace/config.go +++ b/commonspace/config/config.go @@ -1,4 +1,4 @@ -package commonspace +package config type ConfigGetter interface { GetSpace() Config diff --git a/commonspace/deletionstate/deletionstate.go b/commonspace/deletionstate/deletionstate.go new file mode 100644 index 00000000..96f062e3 --- /dev/null +++ b/commonspace/deletionstate/deletionstate.go @@ -0,0 +1,150 @@ +//go:generate mockgen -destination mock_settingsstate/mock_settingsstate.go github.com/anyproto/any-sync/commonspace/settings/settingsstate ObjectDeletionState,StateBuilder,ChangeFactory +package deletionstate + +import ( + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/spacestate" + "github.com/anyproto/any-sync/commonspace/spacestorage" + "go.uber.org/zap" + "sync" +) + +var log = logger.NewNamed(CName) + +const CName = "common.commonspace.deletionstate" + +type StateUpdateObserver func(ids []string) + +type ObjectDeletionState interface { + app.Component + AddObserver(observer StateUpdateObserver) + Add(ids map[string]struct{}) + GetQueued() (ids []string) + Delete(id string) (err error) + Exists(id string) bool + Filter(ids []string) (filtered []string) +} + +type objectDeletionState struct { + sync.RWMutex + log logger.CtxLogger + queued map[string]struct{} + deleted map[string]struct{} + stateUpdateObservers []StateUpdateObserver + storage spacestorage.SpaceStorage +} + +func (st *objectDeletionState) Init(a *app.App) (err error) { + st.storage = a.MustComponent(spacestate.CName).(*spacestate.SpaceState).SpaceStorage + return nil +} + +func (st *objectDeletionState) Name() (name string) { + return CName +} + +func NewObjectDeletionState() ObjectDeletionState { + return &objectDeletionState{ + log: log, + queued: map[string]struct{}{}, + deleted: map[string]struct{}{}, + } +} + +func (st *objectDeletionState) AddObserver(observer StateUpdateObserver) { + st.Lock() + defer st.Unlock() + st.stateUpdateObservers = append(st.stateUpdateObservers, observer) +} + +func (st *objectDeletionState) Add(ids map[string]struct{}) { + var added []string + st.Lock() + defer func() { + st.Unlock() + for _, ob := range st.stateUpdateObservers { + ob(added) + } + }() + + for id := range ids { + if _, exists := st.deleted[id]; exists { + continue + } + if _, exists := st.queued[id]; exists { + continue + } + + var status string + status, err := st.storage.TreeDeletedStatus(id) + if err != nil { + st.log.Warn("failed to get deleted status", zap.String("treeId", id), zap.Error(err)) + continue + } + + switch status { + case spacestorage.TreeDeletedStatusQueued: + st.queued[id] = struct{}{} + case spacestorage.TreeDeletedStatusDeleted: + st.deleted[id] = struct{}{} + default: + err := st.storage.SetTreeDeletedStatus(id, spacestorage.TreeDeletedStatusQueued) + if err != nil { + st.log.Warn("failed to set deleted status", zap.String("treeId", id), zap.Error(err)) + continue + } + st.queued[id] = struct{}{} + } + added = append(added, id) + } +} + +func (st *objectDeletionState) GetQueued() (ids []string) { + st.RLock() + defer st.RUnlock() + ids = make([]string, 0, len(st.queued)) + for id := range st.queued { + ids = append(ids, id) + } + return +} + +func (st *objectDeletionState) Delete(id string) (err error) { + st.Lock() + defer st.Unlock() + delete(st.queued, id) + st.deleted[id] = struct{}{} + err = st.storage.SetTreeDeletedStatus(id, spacestorage.TreeDeletedStatusDeleted) + if err != nil { + return + } + return +} + +func (st *objectDeletionState) Exists(id string) bool { + st.RLock() + defer st.RUnlock() + return st.exists(id) +} + +func (st *objectDeletionState) Filter(ids []string) (filtered []string) { + st.RLock() + defer st.RUnlock() + for _, id := range ids { + if !st.exists(id) { + filtered = append(filtered, id) + } + } + return +} + +func (st *objectDeletionState) exists(id string) bool { + if _, exists := st.deleted[id]; exists { + return true + } + if _, exists := st.queued[id]; exists { + return true + } + return false +} diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 8b59c743..70238d01 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -26,26 +26,17 @@ type DiffSyncer interface { Close() error } -func newDiffSyncer( - spaceId string, - diff ldiff.Diff, - peerManager peermanager.PeerManager, - cache treemanager.TreeManager, - storage spacestorage.SpaceStorage, - clientFactory spacesyncproto.ClientFactory, - syncStatus syncstatus.StatusUpdater, - credentialProvider credentialprovider.CredentialProvider, - log logger.CtxLogger) DiffSyncer { +func newDiffSyncer(hs *headSync) DiffSyncer { return &diffSyncer{ - diff: diff, - spaceId: spaceId, - treeManager: cache, - storage: storage, - peerManager: peerManager, - clientFactory: clientFactory, - credentialProvider: credentialProvider, + diff: hs.diff, + spaceId: hs.spaceId, + treeManager: hs.treeManager, + storage: hs.storage, + peerManager: hs.peerManager, + clientFactory: spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient), + credentialProvider: hs.credentialProvider, log: log, - syncStatus: syncStatus, + syncStatus: hs.syncStatus, } } diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index ddca3e50..048569bb 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -1,14 +1,17 @@ -//go:generate mockgen -destination mock_headsync/mock_headsync.go github.com/anyproto/any-sync/commonspace/headsync DiffSyncer package headsync import ( "context" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/logger" + config2 "github.com/anyproto/any-sync/commonspace/config" "github.com/anyproto/any-sync/commonspace/credentialprovider" + "github.com/anyproto/any-sync/commonspace/headsync" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/settings/settingsstate" + "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" @@ -17,109 +20,114 @@ import ( "github.com/anyproto/any-sync/util/periodicsync" "go.uber.org/zap" "golang.org/x/exp/slices" - "strings" "sync/atomic" "time" ) +var log = logger.NewNamed(CName) + +const CName = "common.commonspace.headsync" + type TreeHeads struct { Id string Heads []string } type HeadSync interface { - Init(objectIds []string, deletionState settingsstate.ObjectDeletionState) - UpdateHeads(id string, heads []string) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) RemoveObjects(ids []string) AllIds() []string DebugAllHeads() (res []TreeHeads) - - Close() (err error) } type headSync struct { spaceId string - periodicSync periodicsync.PeriodicSync - storage spacestorage.SpaceStorage - diff ldiff.Diff - log logger.CtxLogger - syncer DiffSyncer - configuration nodeconf.NodeConf spaceIsDeleted *atomic.Bool + syncPeriod int - syncPeriod int + periodicSync periodicsync.PeriodicSync + storage spacestorage.SpaceStorage + diff ldiff.Diff + log logger.CtxLogger + syncer headsync.DiffSyncer + configuration nodeconf.NodeConf + peerManager peermanager.PeerManager + treeManager treemanager.TreeManager + credentialProvider credentialprovider.CredentialProvider + syncStatus syncstatus.StatusProvider + deletionState settingsstate.ObjectDeletionState } -func NewHeadSync( - spaceId string, - spaceIsDeleted *atomic.Bool, - syncPeriod int, - configuration nodeconf.NodeConf, - storage spacestorage.SpaceStorage, - peerManager peermanager.PeerManager, - cache treemanager.TreeManager, - syncStatus syncstatus.StatusUpdater, - credentialProvider credentialprovider.CredentialProvider, - log logger.CtxLogger) HeadSync { +func New() *headSync { + return &headSync{} +} - diff := ldiff.New(16, 16) - l := log.With(zap.String("spaceId", spaceId)) - factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) - syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, credentialProvider, l) +func (h *headSync) Init(a *app.App) (err error) { + shared := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) + cfg := a.MustComponent("cfg").(config2.ConfigGetter) + h.spaceId = shared.SpaceId + h.spaceIsDeleted = shared.SpaceIsDeleted + h.syncPeriod = cfg.GetSpace().SyncPeriod + h.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) + h.log = log.With(zap.String("spaceId", h.spaceId)) + h.storage = a.MustComponent("spacestorage").(spacestorage.SpaceStorage) + h.diff = ldiff.New(16, 16) + h.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) + h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider) + h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusProvider) + h.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) + h.deletionState = a.MustComponent("deletionstate").(settingsstate.ObjectDeletionState) + h.syncer = newDiffSyncer(h) sync := func(ctx context.Context) (err error) { // for clients cancelling the sync process - if spaceIsDeleted.Load() && !configuration.IsResponsible(spaceId) { + if h.spaceIsDeleted.Load() && !h.configuration.IsResponsible(h.spaceId) { return spacesyncproto.ErrSpaceIsDeleted } - return syncer.Sync(ctx) - } - periodicSync := periodicsync.NewPeriodicSync(syncPeriod, time.Minute, sync, l) - - return &headSync{ - spaceId: spaceId, - storage: storage, - syncer: syncer, - periodicSync: periodicSync, - diff: diff, - log: log, - syncPeriod: syncPeriod, - configuration: configuration, - spaceIsDeleted: spaceIsDeleted, + return h.syncer.Sync(ctx) } + h.periodicSync = periodicsync.NewPeriodicSync(h.syncPeriod, time.Minute, sync, h.log) + return nil } -func (d *headSync) Init(objectIds []string, deletionState settingsstate.ObjectDeletionState) { - d.fillDiff(objectIds) - d.syncer.Init(deletionState) - d.periodicSync.Run() +func (h *headSync) Name() (name string) { + return CName } -func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { - if d.spaceIsDeleted.Load() { +func (h *headSync) Run(ctx context.Context) (err error) { + initialIds, err := h.storage.StoredIds() + if err != nil { + return + } + h.fillDiff(initialIds) + h.periodicSync.Run() + return +} + +func (h *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { + if h.spaceIsDeleted.Load() { peerId, err := peer.CtxPeerId(ctx) if err != nil { return nil, err } // stop receiving all request for sync from clients - if !slices.Contains(d.configuration.NodeIds(d.spaceId), peerId) { + if !slices.Contains(h.configuration.NodeIds(h.spaceId), peerId) { return nil, spacesyncproto.ErrSpaceIsDeleted } } - return HandleRangeRequest(ctx, d.diff, req) + return HandleRangeRequest(ctx, h.diff, req) } -func (d *headSync) UpdateHeads(id string, heads []string) { - d.syncer.UpdateHeads(id, heads) +func (h *headSync) UpdateHeads(id string, heads []string) { + h.syncer.UpdateHeads(id, heads) } -func (d *headSync) AllIds() []string { - return d.diff.Ids() +func (h *headSync) AllIds() []string { + return h.diff.Ids() } -func (d *headSync) DebugAllHeads() (res []TreeHeads) { - els := d.diff.Elements() +func (h *headSync) DebugAllHeads() (res []TreeHeads) { + els := h.diff.Elements() for _, el := range els { idHead := TreeHeads{ Id: el.Id, @@ -130,19 +138,19 @@ func (d *headSync) DebugAllHeads() (res []TreeHeads) { return } -func (d *headSync) RemoveObjects(ids []string) { - d.syncer.RemoveObjects(ids) +func (h *headSync) RemoveObjects(ids []string) { + h.syncer.RemoveObjects(ids) } -func (d *headSync) Close() (err error) { - d.periodicSync.Close() - return d.syncer.Close() +func (h *headSync) Close(ctx context.Context) (err error) { + h.periodicSync.Close() + return h.syncer.Close() } -func (d *headSync) fillDiff(objectIds []string) { +func (h *headSync) fillDiff(objectIds []string) { var els = make([]ldiff.Element, 0, len(objectIds)) for _, id := range objectIds { - st, err := d.storage.TreeStorage(id) + st, err := h.storage.TreeStorage(id) if err != nil { continue } @@ -155,32 +163,8 @@ func (d *headSync) fillDiff(objectIds []string) { Head: concatStrings(heads), }) } - d.diff.Set(els...) - if err := d.storage.WriteSpaceHash(d.diff.Hash()); err != nil { - d.log.Error("can't write space hash", zap.Error(err)) + h.diff.Set(els...) + if err := h.storage.WriteSpaceHash(h.diff.Hash()); err != nil { + h.log.Error("can't write space hash", zap.Error(err)) } } - -func concatStrings(strs []string) string { - var ( - b strings.Builder - totalLen int - ) - for _, s := range strs { - totalLen += len(s) - } - - b.Grow(totalLen) - for _, s := range strs { - b.WriteString(s) - } - return b.String() -} - -func splitString(str string) (res []string) { - const cidLen = 59 - for i := 0; i < len(str); i += cidLen { - res = append(res, str[i:i+cidLen]) - } - return -} diff --git a/commonspace/headsync/util.go b/commonspace/headsync/util.go new file mode 100644 index 00000000..549a77f2 --- /dev/null +++ b/commonspace/headsync/util.go @@ -0,0 +1,27 @@ +package headsync + +import "strings" + +func concatStrings(strs []string) string { + var ( + b strings.Builder + totalLen int + ) + for _, s := range strs { + totalLen += len(s) + } + + b.Grow(totalLen) + for _, s := range strs { + b.WriteString(s) + } + return b.String() +} + +func splitString(str string) (res []string) { + const cidLen = 59 + for i := 0; i < len(str); i += cidLen { + res = append(res, str[i:i+cidLen]) + } + return +} diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 2a87030a..1a6d2497 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -4,6 +4,7 @@ package synctree import ( "context" "errors" + "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "time" "github.com/anyproto/any-sync/app/logger" @@ -11,7 +12,6 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" - "github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/syncstatus" @@ -44,7 +44,7 @@ type SyncTree interface { type syncTree struct { objecttree.ObjectTree synchandler.SyncHandler - syncClient objectsync.SyncClient + syncClient syncclient.SyncClient syncStatus syncstatus.StatusUpdater notifiable HeadNotifiable listener updatelistener.UpdateListener @@ -61,7 +61,7 @@ type ResponsiblePeersGetter interface { type BuildDeps struct { SpaceId string - SyncClient objectsync.SyncClient + SyncClient syncclient.SyncClient Configuration nodeconf.NodeConf HeadNotifiable HeadNotifiable Listener updatelistener.UpdateListener @@ -119,7 +119,7 @@ func buildSyncTree(ctx context.Context, sendUpdate bool, deps BuildDeps) (t Sync if sendUpdate { headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) // send to everybody, because everybody should know that the node or client got new tree - syncTree.syncClient.Broadcast(ctx, headUpdate) + syncTree.syncClient.Broadcast(headUpdate) } return } @@ -156,7 +156,7 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh } s.syncStatus.HeadsChange(s.Id(), res.Heads) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) - s.syncClient.Broadcast(ctx, headUpdate) + s.syncClient.Broadcast(headUpdate) return } @@ -183,7 +183,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree. s.notifiable.UpdateHeads(s.Id(), res.Heads) } headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) - s.syncClient.Broadcast(ctx, headUpdate) + s.syncClient.Broadcast(headUpdate) } return } @@ -239,7 +239,7 @@ func (s *syncTree) SyncWithPeer(ctx context.Context, peerId string) (err error) s.Lock() defer s.Unlock() headUpdate := s.syncClient.CreateHeadUpdate(s, nil) - return s.syncClient.SendWithReply(ctx, peerId, headUpdate.RootChange.Id, headUpdate, "") + return s.syncClient.SendUpdate(peerId, headUpdate.RootChange.Id, headUpdate) } func (s *syncTree) afterBuild() { diff --git a/commonspace/object/tree/synctree/synctree_test.go b/commonspace/object/tree/synctree/synctree_test.go index 9900e49b..5fe9b2e6 100644 --- a/commonspace/object/tree/synctree/synctree_test.go +++ b/commonspace/object/tree/synctree/synctree_test.go @@ -9,6 +9,7 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync" + "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/nodeconf" "github.com/golang/mock/gomock" @@ -18,7 +19,7 @@ import ( type syncTreeMatcher struct { objTree objecttree.ObjectTree - client objectsync.SyncClient + client syncclient.SyncClient listener updatelistener.UpdateListener } @@ -34,8 +35,8 @@ func (s syncTreeMatcher) String() string { return "" } -func syncClientFuncCreator(client objectsync.SyncClient) func(spaceId string, factory objectsync.RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) objectsync.SyncClient { - return func(spaceId string, factory objectsync.RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) objectsync.SyncClient { +func syncClientFuncCreator(client syncclient.SyncClient) func(spaceId string, factory syncclient.RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) syncclient.SyncClient { + return func(spaceId string, factory syncclient.RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) syncclient.SyncClient { return client } } diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index cdb0aa2a..942c0e2f 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -4,7 +4,7 @@ import ( "context" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" - "github.com/anyproto/any-sync/commonspace/objectsync" + "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" @@ -16,7 +16,7 @@ import ( type syncTreeHandler struct { objTree objecttree.ObjectTree - syncClient objectsync.SyncClient + syncClient syncclient.SyncClient syncStatus syncstatus.StatusUpdater handlerLock sync.Mutex spaceId string @@ -25,7 +25,7 @@ type syncTreeHandler struct { const maxQueueSize = 5 -func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient objectsync.SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler { +func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient syncclient.SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler { return &syncTreeHandler{ objTree: objTree, syncClient: syncClient, @@ -119,7 +119,7 @@ func (s *syncTreeHandler) handleHeadUpdate( return } - return s.syncClient.SendWithReply(ctx, senderId, treeId, fullRequest, replyId) + return s.syncClient.QueueRequest(ctx, senderId, treeId, fullRequest, replyId) } if s.alreadyHasHeads(objTree, update.Heads) { @@ -143,7 +143,7 @@ func (s *syncTreeHandler) handleHeadUpdate( return } - return s.syncClient.SendWithReply(ctx, senderId, treeId, fullRequest, replyId) + return s.syncClient.QueueRequest(ctx, senderId, treeId, fullRequest, replyId) } func (s *syncTreeHandler) handleFullSyncRequest( @@ -169,7 +169,7 @@ func (s *syncTreeHandler) handleFullSyncRequest( defer func() { if err != nil { log.ErrorCtx(ctx, "full sync request finished with error", zap.Error(err)) - s.syncClient.SendWithReply(ctx, senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId) + s.syncClient.QueueRequest(ctx, senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId) return } else if fullResponse != nil { cnt := fullResponse.Content.GetFullSyncResponse() @@ -192,7 +192,7 @@ func (s *syncTreeHandler) handleFullSyncRequest( return } - return s.syncClient.SendWithReply(ctx, senderId, treeId, fullResponse, replyId) + return s.syncClient.QueueRequest(ctx, senderId, treeId, fullResponse, replyId) } func (s *syncTreeHandler) handleFullSyncResponse( diff --git a/commonspace/object/tree/synctree/synctreehandler_test.go b/commonspace/object/tree/synctree/synctreehandler_test.go index c81ca5f4..dce08a93 100644 --- a/commonspace/object/tree/synctree/synctreehandler_test.go +++ b/commonspace/object/tree/synctree/synctreehandler_test.go @@ -3,8 +3,8 @@ package synctree import ( "context" "fmt" - "github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync" + "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "sync" "testing" @@ -110,7 +110,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).Times(2) @@ -139,7 +139,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes() @@ -172,7 +172,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h1"}).AnyTimes() @@ -193,7 +193,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes() @@ -218,7 +218,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h1"}).AnyTimes() @@ -251,7 +251,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Header().Return(nil) @@ -284,7 +284,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT(). Id().AnyTimes().Return(treeId) @@ -313,7 +313,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg.RequestId = replyId fx.objectTreeMock.EXPECT(). @@ -340,7 +340,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT(). Id().AnyTimes().Return(treeId) @@ -381,7 +381,7 @@ func TestSyncHandler_HandleFullSyncResponse(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId) + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT(). @@ -414,7 +414,7 @@ func TestSyncHandler_HandleFullSyncResponse(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId) - objectMsg, _ := objectsync.MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId) + objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT(). diff --git a/commonspace/object/tree/synctree/treeremotegetter.go b/commonspace/object/tree/synctree/treeremotegetter.go index 3006ab5b..bee53231 100644 --- a/commonspace/object/tree/synctree/treeremotegetter.go +++ b/commonspace/object/tree/synctree/treeremotegetter.go @@ -47,7 +47,7 @@ func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err e func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *treechangeproto.TreeSyncMessage, err error) { newTreeRequest := t.deps.SyncClient.CreateNewTreeRequest() - resp, err := t.deps.SyncClient.SendSync(ctx, peerId, t.treeId, newTreeRequest) + resp, err := t.deps.SyncClient.SendRequest(ctx, peerId, t.treeId, newTreeRequest) if err != nil { return } diff --git a/commonspace/object/tree/synctree/utils_test.go b/commonspace/object/tree/synctree/utils_test.go index 46936560..8f57a9f2 100644 --- a/commonspace/object/tree/synctree/utils_test.go +++ b/commonspace/object/tree/synctree/utils_test.go @@ -7,7 +7,7 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" - "github.com/anyproto/any-sync/commonspace/objectsync" + "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" @@ -89,14 +89,14 @@ type testSyncHandler struct { peerId string aclList list.AclList log *messageLog - syncClient objectsync.SyncClient + syncClient syncclient.SyncClient builder objecttree.BuildObjectTreeFunc } // createSyncHandler creates a sync handler when a tree is already created func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler { - factory := objectsync.NewRequestFactory() - syncClient := objectsync.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) + factory := syncclient.NewRequestFactory() + syncClient := syncclient.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) netTree := &broadcastTree{ ObjectTree: objTree, SyncClient: syncClient, @@ -107,8 +107,8 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo // createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree) func createEmptySyncHandler(peerId, spaceId string, builder objecttree.BuildObjectTreeFunc, aclList list.AclList, log *messageLog) *testSyncHandler { - factory := objectsync.NewRequestFactory() - syncClient := objectsync.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) + factory := syncclient.NewRequestFactory() + syncClient := syncclient.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) batcher := mb.New[protocolMsg](0) return &testSyncHandler{ @@ -140,9 +140,9 @@ func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, re return } if unmarshalled.Content.GetFullSyncResponse() == nil { - newTreeRequest := objectsync.NewRequestFactory().CreateNewTreeRequest() + newTreeRequest := syncclient.NewRequestFactory().CreateNewTreeRequest() var objMsg *spacesyncproto.ObjectSyncMessage - objMsg, err = objectsync.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") + objMsg, err = syncclient.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") if err != nil { return } @@ -167,8 +167,8 @@ func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, re } h.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, h.syncClient, syncstatus.NewNoOpSyncStatus()) var objMsg *spacesyncproto.ObjectSyncMessage - newTreeRequest := objectsync.NewRequestFactory().CreateHeadUpdate(netTree, res.Added) - objMsg, err = objectsync.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") + newTreeRequest := syncclient.NewRequestFactory().CreateHeadUpdate(netTree, res.Added) + objMsg, err = syncclient.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") if err != nil { return } @@ -278,7 +278,7 @@ func (m *testMessagePool) SendSync(ctx context.Context, peerId string, message * // it is a simplified version of SyncTree which is easier to use in the test environment type broadcastTree struct { objecttree.ObjectTree - objectsync.SyncClient + syncclient.SyncClient } func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) { diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go deleted file mode 100644 index 19098ace..00000000 --- a/commonspace/objectsync/msgpool.go +++ /dev/null @@ -1,142 +0,0 @@ -package objectsync - -import ( - "context" - "fmt" - "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" - "github.com/anyproto/any-sync/commonspace/peermanager" - "github.com/anyproto/any-sync/commonspace/spacesyncproto" - "go.uber.org/zap" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" -) - -type LastUsage interface { - LastUsage() time.Time -} - -// MessagePool can be made generic to work with different streams -type MessagePool interface { - LastUsage - synchandler.SyncHandler - peermanager.PeerManager - SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) -} - -type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) - -type responseWaiter struct { - ch chan *spacesyncproto.ObjectSyncMessage -} - -type messagePool struct { - sync.Mutex - peermanager.PeerManager - messageHandler MessageHandler - waiters map[string]responseWaiter - waitersMx sync.Mutex - counter atomic.Uint64 - lastUsage atomic.Int64 -} - -func newMessagePool(peerManager peermanager.PeerManager, messageHandler MessageHandler) MessagePool { - s := &messagePool{ - PeerManager: peerManager, - messageHandler: messageHandler, - waiters: make(map[string]responseWaiter), - } - return s -} - -func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { - s.updateLastUsage() - if _, ok := ctx.Deadline(); !ok { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Minute) - defer cancel() - } - newCounter := s.counter.Add(1) - msg.RequestId = genReplyKey(peerId, msg.ObjectId, newCounter) - log.InfoCtx(ctx, "mpool sendSync", zap.String("requestId", msg.RequestId)) - s.waitersMx.Lock() - waiter := responseWaiter{ - ch: make(chan *spacesyncproto.ObjectSyncMessage, 1), - } - s.waiters[msg.RequestId] = waiter - s.waitersMx.Unlock() - - err = s.SendPeer(ctx, peerId, msg) - if err != nil { - return - } - select { - case <-ctx.Done(): - s.waitersMx.Lock() - delete(s.waiters, msg.RequestId) - s.waitersMx.Unlock() - - log.With(zap.String("requestId", msg.RequestId)).DebugCtx(ctx, "time elapsed when waiting") - err = fmt.Errorf("sendSync context error: %v", ctx.Err()) - case reply = <-waiter.ch: - // success - } - return -} - -func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { - s.updateLastUsage() - return s.PeerManager.SendPeer(ctx, peerId, msg) -} - -func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { - s.updateLastUsage() - return s.PeerManager.Broadcast(ctx, msg) -} - -func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { - s.updateLastUsage() - if msg.ReplyId != "" { - log.InfoCtx(ctx, "mpool receive reply", zap.String("replyId", msg.ReplyId)) - // we got reply, send it to waiter - if s.stopWaiter(msg) { - return - } - log.WarnCtx(ctx, "reply id does not exist", zap.String("replyId", msg.ReplyId)) - return - } - return s.messageHandler(ctx, senderId, msg) -} - -func (s *messagePool) LastUsage() time.Time { - return time.Unix(s.lastUsage.Load(), 0) -} - -func (s *messagePool) updateLastUsage() { - s.lastUsage.Store(time.Now().Unix()) -} - -func (s *messagePool) stopWaiter(msg *spacesyncproto.ObjectSyncMessage) bool { - s.waitersMx.Lock() - waiter, exists := s.waiters[msg.ReplyId] - if exists { - delete(s.waiters, msg.ReplyId) - s.waitersMx.Unlock() - waiter.ch <- msg - return true - } - s.waitersMx.Unlock() - return false -} - -func genReplyKey(peerId, treeId string, counter uint64) string { - b := &strings.Builder{} - b.WriteString(peerId) - b.WriteString(".") - b.WriteString(treeId) - b.WriteString(".") - b.WriteString(strconv.FormatUint(counter, 36)) - return b.String() -} diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index e56ee6ed..64ab66f6 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -4,15 +4,19 @@ package objectsync import ( "context" "fmt" - "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" - "github.com/gogo/protobuf/proto" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/object/treemanager" + "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" + "github.com/anyproto/any-sync/commonspace/spacestate" + "github.com/anyproto/any-sync/metric" + "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/util/multiqueue" + "github.com/cheggaaa/mb/v3" "sync/atomic" "time" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/object/syncobjectgetter" - "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" - "github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/nodeconf" @@ -20,63 +24,127 @@ import ( "golang.org/x/exp/slices" ) -var log = logger.NewNamed("common.commonspace.objectsync") +const CName = "common.commonspace.objectsync" + +var log = logger.NewNamed(CName) type ObjectSync interface { - LastUsage - synchandler.SyncHandler - SyncClient() SyncClient + app.ComponentRunnable +} - Close() (err error) +type HandleMessage struct { + Id uint64 + ReceiveTime time.Time + StartHandlingTime time.Time + Deadline time.Time + SenderId string + Message *spacesyncproto.ObjectSyncMessage + PeerCtx context.Context +} + +func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field { + return append(fields, + metric.SpaceId(m.Message.SpaceId), + metric.ObjectId(m.Message.ObjectId), + metric.QueueDur(m.StartHandlingTime.Sub(m.ReceiveTime)), + metric.TotalDur(time.Since(m.ReceiveTime)), + ) } type objectSync struct { spaceId string - messagePool MessagePool - syncClient SyncClient + syncClient syncclient.SyncClient objectGetter syncobjectgetter.SyncObjectGetter configuration nodeconf.NodeConf spaceStorage spacestorage.SpaceStorage + metric metric.Metric - syncCtx context.Context - cancelSync context.CancelFunc spaceIsDeleted *atomic.Bool + handleQueue multiqueue.MultiQueue[HandleMessage] } -func NewObjectSync( - spaceId string, - spaceIsDeleted *atomic.Bool, - configuration nodeconf.NodeConf, - peerManager peermanager.PeerManager, - objectGetter syncobjectgetter.SyncObjectGetter, - storage spacestorage.SpaceStorage) ObjectSync { - syncCtx, cancel := context.WithCancel(context.Background()) - os := &objectSync{ - objectGetter: objectGetter, - spaceStorage: storage, - spaceId: spaceId, - syncCtx: syncCtx, - cancelSync: cancel, - spaceIsDeleted: spaceIsDeleted, - configuration: configuration, +func (s *objectSync) Init(a *app.App) (err error) { + s.syncClient = a.MustComponent(syncclient.CName).(syncclient.SyncClient) + s.objectGetter = a.MustComponent(treemanager.CName).(treemanager.TreeManager).(syncobjectgetter.SyncObjectGetter) + s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) + sharedData := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) + s.metric = a.MustComponent(metric.CName).(metric.Metric) + s.spaceIsDeleted = sharedData.SpaceIsDeleted + s.spaceId = sharedData.SpaceId + s.handleQueue = multiqueue.New[HandleMessage](s.processHandleMessage, 100) + return nil +} + +func (s *objectSync) Name() (name string) { + return CName +} + +func (s *objectSync) Run(ctx context.Context) (err error) { + return nil +} + +func (s *objectSync) Close(ctx context.Context) (err error) { + return s.handleQueue.Close() +} + +func NewObjectSync() ObjectSync { + return &objectSync{} +} + +func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { + threadId := hm.Message.ObjectId + hm.ReceiveTime = time.Now() + if hm.Message.ReplyId != "" { + threadId += hm.Message.ReplyId + defer func() { + _ = s.handleQueue.CloseThread(threadId) + }() + } + if hm.PeerCtx == nil { + hm.PeerCtx = ctx + } + err = s.handleQueue.Add(ctx, threadId, hm) + if err == mb.ErrOverflowed { + log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.spaceId), zap.String("objectId", threadId)) + // skip overflowed error + return nil } - os.messagePool = newMessagePool(peerManager, os.handleMessage) - os.syncClient = NewSyncClient(spaceId, os.messagePool, NewRequestFactory()) - return os -} - -func (s *objectSync) Close() (err error) { - s.cancelSync() return } -func (s *objectSync) LastUsage() time.Time { - return s.messagePool.LastUsage() -} +func (s *objectSync) processHandleMessage(msg HandleMessage) { + var err error + msg.StartHandlingTime = time.Now() + ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId) + ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId)) + defer func() { + if s.metric == nil { + return + } + s.metric.RequestLog(msg.PeerCtx, "space.streamOp", msg.LogFields( + zap.Error(err), + )...) + }() -func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { - return s.messagePool.HandleMessage(ctx, senderId, message) + if !msg.Deadline.IsZero() { + now := time.Now() + if now.After(msg.Deadline) { + log.InfoCtx(ctx, "skip message: deadline exceed") + err = context.DeadlineExceeded + return + } + var cancel context.CancelFunc + ctx, cancel = context.WithDeadline(ctx, msg.Deadline) + defer cancel() + } + if err = s.handleMessage(ctx, msg.SenderId, msg.Message); err != nil { + if msg.Message.ObjectId != "" { + // cleanup thread on error + _ = s.handleQueue.CloseThread(msg.Message.ObjectId) + } + log.InfoCtx(ctx, "handleMessage error", zap.Error(err)) + } } func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { @@ -88,70 +156,16 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp log = log.With(zap.Bool("isDeleted", true)) // preventing sync with other clients if they are not just syncing the settings tree if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() { - s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId) return fmt.Errorf("can't perform operation with object, space is deleted") } } - log.DebugCtx(ctx, "handling message") - hasTree, err := s.spaceStorage.HasTree(msg.ObjectId) - if err != nil { - s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId) - return fmt.Errorf("falied to execute get operation on storage has tree: %w", err) - } - // in this case we will try to get it from remote, unless the sender also sent us the same request :-) - if !hasTree { - treeMsg := &treechangeproto.TreeSyncMessage{} - err = proto.Unmarshal(msg.Payload, treeMsg) - if err != nil { - s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.RequestId) - return fmt.Errorf("failed to unmarshall tree sync message: %w", err) - } - // this means that we don't have the tree locally and therefore can't return it - if s.isEmptyFullSyncRequest(treeMsg) { - err = treechangeproto.ErrGetTree - s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.RequestId) - return fmt.Errorf("failed to get tree from storage on full sync: %w", err) - } - } obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) if err != nil { - // TODO: write tests for object sync https://linear.app/anytype/issue/GO-1299/write-tests-for-commonspaceobjectsync - s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId) return fmt.Errorf("failed to get object from cache: %w", err) } - // TODO: unmarshall earlier err = obj.HandleMessage(ctx, senderId, msg) if err != nil { - s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId) return fmt.Errorf("failed to handle message: %w", err) } return } - -func (s *objectSync) SyncClient() SyncClient { - return s.syncClient -} - -func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId, objectId string) { - unmarshalled := &treechangeproto.TreeSyncMessage{} - err := proto.Unmarshal(msg.Payload, unmarshalled) - if err != nil { - return - } - s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.RequestId) -} - -func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, objectId, replyId string) { - // we don't send errors if have no reply id, this can lead to bugs and also nobody needs this error - if replyId == "" { - return - } - resp := treechangeproto.WrapError(respErr, root) - if err := s.syncClient.SendWithReply(ctx, senderId, objectId, resp, replyId); err != nil { - log.InfoCtx(ctx, "failed to send error to client") - } -} - -func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool { - return msg.GetContent().GetFullSyncRequest() != nil && len(msg.GetContent().GetFullSyncRequest().GetHeads()) == 0 -} diff --git a/commonspace/objectsync/syncclient.go b/commonspace/objectsync/syncclient.go deleted file mode 100644 index aad712bd..00000000 --- a/commonspace/objectsync/syncclient.go +++ /dev/null @@ -1,78 +0,0 @@ -package objectsync - -import ( - "context" - "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" - "github.com/anyproto/any-sync/commonspace/spacesyncproto" - "go.uber.org/zap" -) - -type SyncClient interface { - RequestFactory - Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) - SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) - SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) - MessagePool() MessagePool -} - -type syncClient struct { - RequestFactory - spaceId string - messagePool MessagePool -} - -func NewSyncClient( - spaceId string, - messagePool MessagePool, - factory RequestFactory) SyncClient { - return &syncClient{ - messagePool: messagePool, - RequestFactory: factory, - spaceId: spaceId, - } -} - -func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) { - objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "") - if err != nil { - return - } - err = s.messagePool.Broadcast(ctx, objMsg) - if err != nil { - log.DebugCtx(ctx, "broadcast error", zap.Error(err)) - } -} - -func (s *syncClient) SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { - objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "") - if err != nil { - return - } - return s.messagePool.SendSync(ctx, peerId, objMsg) -} - -func (s *syncClient) SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) { - objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, replyId) - if err != nil { - return - } - return s.messagePool.SendPeer(ctx, peerId, objMsg) -} - -func (s *syncClient) MessagePool() MessagePool { - return s.messagePool -} - -func MarshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { - payload, err := message.Marshal() - if err != nil { - return - } - objMsg = &spacesyncproto.ObjectSyncMessage{ - ReplyId: replyId, - Payload: payload, - ObjectId: objectId, - SpaceId: spaceId, - } - return -} diff --git a/commonspace/objectsync/requestfactory.go b/commonspace/objectsync/syncclient/requestfactory.go similarity index 99% rename from commonspace/objectsync/requestfactory.go rename to commonspace/objectsync/syncclient/requestfactory.go index 1f4f3c7d..0d908179 100644 --- a/commonspace/objectsync/requestfactory.go +++ b/commonspace/objectsync/syncclient/requestfactory.go @@ -1,4 +1,4 @@ -package objectsync +package syncclient import ( "fmt" diff --git a/commonspace/objectsync/syncclient/syncclient.go b/commonspace/objectsync/syncclient/syncclient.go new file mode 100644 index 00000000..6041a2d0 --- /dev/null +++ b/commonspace/objectsync/syncclient/syncclient.go @@ -0,0 +1,98 @@ +package syncclient + +import ( + "context" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anyproto/any-sync/commonspace/requestsender" + "github.com/anyproto/any-sync/commonspace/spacestate" + "github.com/anyproto/any-sync/commonspace/spacesyncproto" + "github.com/anyproto/any-sync/commonspace/streamsender" + "go.uber.org/zap" +) + +const CName = "common.objectsync.syncclient" + +var log = logger.NewNamed(CName) + +type SyncClient interface { + app.Component + RequestFactory + Broadcast(msg *treechangeproto.TreeSyncMessage) + SendUpdate(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) + QueueRequest(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) + SendRequest(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) +} + +type syncClient struct { + RequestFactory + spaceId string + requestSender requestsender.RequestSender + streamSender streamsender.StreamSender +} + +func NewSyncClient() SyncClient { + return &syncClient{} +} + +func (s *syncClient) Init(a *app.App) (err error) { + sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) + s.spaceId = sharedState.SpaceId + s.requestSender = a.MustComponent(requestsender.CName).(requestsender.RequestSender) + s.streamSender = a.MustComponent(streamsender.CName).(streamsender.StreamSender) + return nil +} + +func (s *syncClient) Name() (name string) { + return CName +} + +func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) { + objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "") + if err != nil { + return + } + err = s.streamSender.Broadcast(objMsg) + if err != nil { + log.Debug("broadcast error", zap.Error(err)) + } +} + +func (s *syncClient) SendUpdate(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) { + objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "") + if err != nil { + return + } + return s.streamSender.SendPeer(peerId, objMsg) +} + +func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { + objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "") + if err != nil { + return + } + return s.requestSender.SendRequest(ctx, peerId, objMsg) +} + +func (s *syncClient) QueueRequest(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) { + objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "") + if err != nil { + return + } + return s.requestSender.QueueRequest(peerId, objMsg) +} + +func MarshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { + payload, err := message.Marshal() + if err != nil { + return + } + objMsg = &spacesyncproto.ObjectSyncMessage{ + ReplyId: replyId, + Payload: payload, + ObjectId: objectId, + SpaceId: spaceId, + } + return +} diff --git a/commonspace/objecttreebuilder/treebuilder.go b/commonspace/objecttreebuilder/treebuilder.go new file mode 100644 index 00000000..f8bf346b --- /dev/null +++ b/commonspace/objecttreebuilder/treebuilder.go @@ -0,0 +1,191 @@ +package objecttreebuilder + +import ( + "context" + "errors" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/headsync" + "github.com/anyproto/any-sync/commonspace/object/acl/list" + "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" + "github.com/anyproto/any-sync/commonspace/object/tree/synctree" + "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" + "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" + "github.com/anyproto/any-sync/commonspace/peermanager" + "github.com/anyproto/any-sync/commonspace/spacestate" + "github.com/anyproto/any-sync/commonspace/spacestorage" + "github.com/anyproto/any-sync/commonspace/syncstatus" + "github.com/anyproto/any-sync/nodeconf" + "go.uber.org/zap" + "sync/atomic" +) + +type BuildTreeOpts struct { + Listener updatelistener.UpdateListener + WaitTreeRemoteSync bool + TreeBuilder objecttree.BuildObjectTreeFunc +} + +const CName = "common.commonspace.objecttreebuilder" + +var log = logger.NewNamed(CName) +var ErrSpaceClosed = errors.New("space is closed") + +type HistoryTreeOpts struct { + BeforeId string + Include bool + BuildFullTree bool +} + +type TreeBuilder interface { + app.Component + BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) + BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) + CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) + PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) + SetOnCloseHandler(handler func(id string)) +} + +type treeBuilder struct { + syncClient syncclient.SyncClient + configuration nodeconf.NodeConf + headsNotifiable synctree.HeadNotifiable + peerManager peermanager.PeerManager + spaceStorage spacestorage.SpaceStorage + syncStatus syncstatus.StatusUpdater + + log logger.CtxLogger + builder objecttree.BuildObjectTreeFunc + spaceId string + aclList list.AclList + treesUsed *atomic.Int32 + isClosed *atomic.Bool + onClose func(id string) +} + +func (t *treeBuilder) Init(a *app.App) (err error) { + state := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) + t.spaceId = state.SpaceId + t.aclList = state.AclList + t.isClosed = state.SpaceIsClosed + t.spaceStorage = state.SpaceStorage + t.treesUsed = state.TreesUsed + t.builder = state.TreeBuilderFunc + t.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) + t.headsNotifiable = a.MustComponent(headsync.CName).(headsync.HeadSync) + t.spaceStorage = state.SpaceStorage + t.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater) + t.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) + t.log = log.With(zap.String("spaceId", t.spaceId)) + t.onClose = state.Actions.OnObjectDelete + return nil +} + +func (t *treeBuilder) Name() (name string) { + return CName +} + +func (t *treeBuilder) SetOnCloseHandler(handler func(id string)) { + t.onClose = handler +} + +func (t *treeBuilder) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (ot objecttree.ObjectTree, err error) { + if t.isClosed.Load() { + // TODO: change to real error + err = ErrSpaceClosed + return + } + treeBuilder := opts.TreeBuilder + if treeBuilder == nil { + treeBuilder = t.builder + } + deps := synctree.BuildDeps{ + SpaceId: t.spaceId, + SyncClient: t.syncClient, + Configuration: t.configuration, + HeadNotifiable: t.headsNotifiable, + Listener: opts.Listener, + AclList: t.aclList, + SpaceStorage: t.spaceStorage, + OnClose: t.onClose, + SyncStatus: t.syncStatus, + WaitTreeRemoteSync: opts.WaitTreeRemoteSync, + PeerGetter: t.peerManager, + BuildObjectTree: treeBuilder, + } + t.treesUsed.Add(1) + t.log.Debug("incrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load())) + if ot, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil { + t.treesUsed.Add(-1) + t.log.Debug("decrementing counter, load failed", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load()), zap.Error(err)) + return nil, err + } + return +} + +func (t *treeBuilder) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (ot objecttree.HistoryTree, err error) { + if t.isClosed.Load() { + // TODO: change to real error + err = ErrSpaceClosed + return + } + + params := objecttree.HistoryTreeParams{ + AclList: t.aclList, + BeforeId: opts.BeforeId, + IncludeBeforeId: opts.Include, + BuildFullTree: opts.BuildFullTree, + } + params.TreeStorage, err = t.spaceStorage.TreeStorage(id) + if err != nil { + return + } + return objecttree.BuildHistoryTree(params) +} + +func (t *treeBuilder) CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) { + if t.isClosed.Load() { + err = ErrSpaceClosed + return + } + root, err := objecttree.CreateObjectTreeRoot(payload, t.aclList) + if err != nil { + return + } + + res = treestorage.TreeStorageCreatePayload{ + RootRawChange: root, + Changes: []*treechangeproto.RawTreeChangeWithId{root}, + Heads: []string{root.Id}, + } + return +} + +func (t *treeBuilder) PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (ot objecttree.ObjectTree, err error) { + if t.isClosed.Load() { + err = ErrSpaceClosed + return + } + deps := synctree.BuildDeps{ + SpaceId: t.spaceId, + SyncClient: t.syncClient, + Configuration: t.configuration, + HeadNotifiable: t.headsNotifiable, + Listener: listener, + AclList: t.aclList, + SpaceStorage: t.spaceStorage, + OnClose: t.onClose, + SyncStatus: t.syncStatus, + PeerGetter: t.peerManager, + BuildObjectTree: t.builder, + } + ot, err = synctree.PutSyncTree(ctx, payload, deps) + if err != nil { + return + } + t.treesUsed.Add(1) + t.log.Debug("incrementing counter", zap.String("id", payload.RootRawChange.Id), zap.Int32("trees", t.treesUsed.Load())) + return +} diff --git a/commonspace/requestsender/requestsender.go b/commonspace/requestsender/requestsender.go new file mode 100644 index 00000000..1ba166fb --- /dev/null +++ b/commonspace/requestsender/requestsender.go @@ -0,0 +1,45 @@ +package requestsender + +import ( + "context" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/spacesyncproto" +) + +const CName = "common.commonspace.requestsender" + +var log = logger.NewNamed(CName) + +type RequestSender interface { + app.ComponentRunnable + SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) + QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) +} + +type requestSender struct { +} + +func (r *requestSender) Init(a *app.App) (err error) { + return +} + +func (r *requestSender) Name() (name string) { + return CName +} + +func (r *requestSender) Run(ctx context.Context) (err error) { + return nil +} + +func (r *requestSender) Close(ctx context.Context) (err error) { + return nil +} + +func (r *requestSender) SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { + return nil, nil +} + +func (r *requestSender) QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + return nil +} diff --git a/commonspace/settings/deleter.go b/commonspace/settings/deleter.go index 322308de..5d3bd5a1 100644 --- a/commonspace/settings/deleter.go +++ b/commonspace/settings/deleter.go @@ -2,9 +2,9 @@ package settings import ( "context" + "github.com/anyproto/any-sync/commonspace/deletionstate" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/object/treemanager" - "github.com/anyproto/any-sync/commonspace/settings/settingsstate" "github.com/anyproto/any-sync/commonspace/spacestorage" "go.uber.org/zap" ) @@ -15,11 +15,11 @@ type Deleter interface { type deleter struct { st spacestorage.SpaceStorage - state settingsstate.ObjectDeletionState + state deletionstate.ObjectDeletionState getter treemanager.TreeManager } -func newDeleter(st spacestorage.SpaceStorage, state settingsstate.ObjectDeletionState, getter treemanager.TreeManager) Deleter { +func newDeleter(st spacestorage.SpaceStorage, state deletionstate.ObjectDeletionState, getter treemanager.TreeManager) Deleter { return &deleter{st, state, getter} } diff --git a/commonspace/settings/deletionmanager.go b/commonspace/settings/deletionmanager.go index 2d3d47ff..2611d28c 100644 --- a/commonspace/settings/deletionmanager.go +++ b/commonspace/settings/deletionmanager.go @@ -2,6 +2,7 @@ package settings import ( "context" + "github.com/anyproto/any-sync/commonspace/deletionstate" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/settings/settingsstate" "go.uber.org/zap" @@ -20,7 +21,7 @@ func newDeletionManager( settingsId string, isResponsible bool, treeManager treemanager.TreeManager, - deletionState settingsstate.ObjectDeletionState, + deletionState deletionstate.ObjectDeletionState, provider SpaceIdsProvider, onSpaceDelete func()) DeletionManager { return &deletionManager{ @@ -35,7 +36,7 @@ func newDeletionManager( } type deletionManager struct { - deletionState settingsstate.ObjectDeletionState + deletionState deletionstate.ObjectDeletionState provider SpaceIdsProvider treeManager treemanager.TreeManager spaceId string diff --git a/commonspace/settings/settings.go b/commonspace/settings/settings.go index 5c2b41cf..4e2177ae 100644 --- a/commonspace/settings/settings.go +++ b/commonspace/settings/settings.go @@ -1,328 +1,88 @@ -//go:generate mockgen -destination mock_settings/mock_settings.go github.com/anyproto/any-sync/commonspace/settings DeletionManager,Deleter,SpaceIdsProvider package settings import ( "context" - "errors" - "fmt" - "github.com/anyproto/any-sync/util/crypto" - "github.com/anyproto/any-sync/accountservice" - "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/deletionstate" + "github.com/anyproto/any-sync/commonspace/headsync" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/synctree" "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener" - "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/treemanager" - "github.com/anyproto/any-sync/commonspace/settings/settingsstate" + "github.com/anyproto/any-sync/commonspace/objecttreebuilder" + "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestorage" - "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/nodeconf" - "github.com/gogo/protobuf/proto" "go.uber.org/zap" - "golang.org/x/exp/slices" ) -var log = logger.NewNamed("common.commonspace.settings") +const CName = "common.commonspace.settings" -type SettingsObject interface { - synctree.SyncTree - Init(ctx context.Context) (err error) - DeleteObject(id string) (err error) - DeleteSpace(ctx context.Context, raw *treechangeproto.RawTreeChangeWithId) (err error) - SpaceDeleteRawChange() (raw *treechangeproto.RawTreeChangeWithId, err error) +type Settings interface { + app.ComponentRunnable } -var ( - ErrDeleteSelf = errors.New("cannot delete self") - ErrAlreadyDeleted = errors.New("the object is already deleted") - ErrObjDoesNotExist = errors.New("the object does not exist") - ErrCantDeleteSpace = errors.New("not able to delete space") -) +type settings struct { + account accountservice.Service + treeManager treemanager.TreeManager + storage spacestorage.SpaceStorage + configuration nodeconf.NodeConf + deletionState deletionstate.ObjectDeletionState + headsync headsync.HeadSync + spaceActions spacestate.SpaceActions + treeBuilder objecttreebuilder.TreeBuilder -var ( - DoSnapshot = objecttree.DoSnapshot - buildHistoryTree = func(objTree objecttree.ObjectTree) (objecttree.ReadableObjectTree, error) { - return objecttree.BuildHistoryTree(objecttree.HistoryTreeParams{ - TreeStorage: objTree.Storage(), - AclList: objTree.AclList(), - BuildFullTree: true, - }) - } -) - -type BuildTreeFunc func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) - -type Deps struct { - BuildFunc BuildTreeFunc - Account accountservice.Service - TreeManager treemanager.TreeManager - Store spacestorage.SpaceStorage - Configuration nodeconf.NodeConf - DeletionState settingsstate.ObjectDeletionState - Provider SpaceIdsProvider - OnSpaceDelete func() - // testing dependencies - builder settingsstate.StateBuilder - del Deleter - delManager DeletionManager - changeFactory settingsstate.ChangeFactory + settingsObject SettingsObject } -type settingsObject struct { - synctree.SyncTree - account accountservice.Service - spaceId string - treeManager treemanager.TreeManager - store spacestorage.SpaceStorage - builder settingsstate.StateBuilder - buildFunc BuildTreeFunc - loop *deleteLoop - - state *settingsstate.State - deletionState settingsstate.ObjectDeletionState - deletionManager DeletionManager - changeFactory settingsstate.ChangeFactory +func (s *settings) Run(ctx context.Context) (err error) { + return s.settingsObject.Init(ctx) } -func NewSettingsObject(deps Deps, spaceId string) (obj SettingsObject) { - var ( - deleter Deleter - deletionManager DeletionManager - builder settingsstate.StateBuilder - changeFactory settingsstate.ChangeFactory - ) - if deps.del == nil { - deleter = newDeleter(deps.Store, deps.DeletionState, deps.TreeManager) - } else { - deleter = deps.del - } - if deps.delManager == nil { - deletionManager = newDeletionManager( - spaceId, - deps.Store.SpaceSettingsId(), - deps.Configuration.IsResponsible(spaceId), - deps.TreeManager, - deps.DeletionState, - deps.Provider, - deps.OnSpaceDelete) - } else { - deletionManager = deps.delManager - } - if deps.builder == nil { - builder = settingsstate.NewStateBuilder() - } else { - builder = deps.builder - } - if deps.changeFactory == nil { - changeFactory = settingsstate.NewChangeFactory() - } else { - changeFactory = deps.changeFactory - } - - loop := newDeleteLoop(func() { - deleter.Delete() - }) - deps.DeletionState.AddObserver(func(ids []string) { - loop.notify() - }) - - s := &settingsObject{ - loop: loop, - spaceId: spaceId, - account: deps.Account, - deletionState: deps.DeletionState, - treeManager: deps.TreeManager, - store: deps.Store, - buildFunc: deps.BuildFunc, - builder: builder, - deletionManager: deletionManager, - changeFactory: changeFactory, - } - obj = s - return +func (s *settings) Close(ctx context.Context) (err error) { + return s.settingsObject.Close() } -func (s *settingsObject) updateIds(tr objecttree.ObjectTree) { - var err error - s.state, err = s.builder.Build(tr, s.state) - if err != nil { - log.Error("failed to build state", zap.Error(err)) - return - } - log.Debug("updating object state", zap.String("deleted by", s.state.DeleterId)) - if err = s.deletionManager.UpdateState(context.Background(), s.state); err != nil { - log.Error("failed to update state", zap.Error(err)) - } -} +func (s *settings) Init(a *app.App) (err error) { + s.account = a.MustComponent(accountservice.CName).(accountservice.Service) + s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) + s.headsync = a.MustComponent(headsync.CName).(headsync.HeadSync) + s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) + s.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState) + s.treeBuilder = a.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilder) -// Update is called as part of UpdateListener interface -func (s *settingsObject) Update(tr objecttree.ObjectTree) { - s.updateIds(tr) -} + sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) + s.spaceActions = sharedState.Actions + s.storage = sharedState.SpaceStorage -// Rebuild is called as part of UpdateListener interface (including when the object is built for the first time, e.g. on Init call) -func (s *settingsObject) Rebuild(tr objecttree.ObjectTree) { - // at initial build "s" may not contain the object tree, so it is safer to provide it from the function parameter - s.state = nil - s.updateIds(tr) -} - -func (s *settingsObject) Init(ctx context.Context) (err error) { - settingsId := s.store.SpaceSettingsId() - log.Debug("space settings id", zap.String("id", settingsId)) - s.SyncTree, err = s.buildFunc(ctx, settingsId, s) - if err != nil { - return - } - // TODO: remove this check when everybody updates - if err = s.checkHistoryState(ctx); err != nil { - return - } - s.loop.Run() - return -} - -func (s *settingsObject) checkHistoryState(ctx context.Context) (err error) { - historyTree, err := buildHistoryTree(s.SyncTree) - if err != nil { - return - } - fullState, err := s.builder.Build(historyTree, nil) - if err != nil { - return - } - if len(fullState.DeletedIds) != len(s.state.DeletedIds) { - log.WarnCtx(ctx, "state does not have all deleted ids", - zap.Int("fullstate ids", len(fullState.DeletedIds)), - zap.Int("state ids", len(fullState.DeletedIds))) - s.state = fullState - err = s.deletionManager.UpdateState(context.Background(), s.state) - if err != nil { + deps := Deps{ + BuildFunc: func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) { + res, err := s.treeBuilder.BuildTree(ctx, id, objecttreebuilder.BuildTreeOpts{ + Listener: listener, + WaitTreeRemoteSync: false, + // space settings document should not have empty data + TreeBuilder: objecttree.BuildObjectTree, + }) + log.Debug("building settings tree", zap.String("id", id), zap.String("spaceId", sharedState.SpaceId)) + if err != nil { + return + } + t = res.(synctree.SyncTree) return - } + }, + Account: s.account, + TreeManager: s.treeManager, + Store: s.storage, + Configuration: s.configuration, + DeletionState: s.deletionState, + Provider: s.headsync, + OnSpaceDelete: s.spaceActions.OnSpaceDelete, } - return + s.settingsObject = NewSettingsObject(deps, sharedState.SpaceId) + return nil } -func (s *settingsObject) Close() error { - s.loop.Close() - return s.SyncTree.Close() -} - -func (s *settingsObject) DeleteSpace(ctx context.Context, raw *treechangeproto.RawTreeChangeWithId) (err error) { - s.Lock() - defer s.Unlock() - defer func() { - log.Debug("finished adding delete change", zap.Error(err)) - }() - err = s.verifyDeleteSpace(raw) - if err != nil { - return - } - res, err := s.AddRawChanges(ctx, objecttree.RawChangesPayload{ - NewHeads: []string{raw.Id}, - RawChanges: []*treechangeproto.RawTreeChangeWithId{raw}, - }) - if err != nil { - return - } - if !slices.Contains(res.Heads, raw.Id) { - err = ErrCantDeleteSpace - return - } - return -} - -func (s *settingsObject) SpaceDeleteRawChange() (raw *treechangeproto.RawTreeChangeWithId, err error) { - accountData := s.account.Account() - data, err := s.changeFactory.CreateSpaceDeleteChange(accountData.PeerId, s.state, false) - if err != nil { - return - } - return s.PrepareChange(objecttree.SignableChangeContent{ - Data: data, - Key: accountData.SignKey, - IsSnapshot: false, - IsEncrypted: false, - }) -} - -func (s *settingsObject) DeleteObject(id string) (err error) { - s.Lock() - defer s.Unlock() - if s.Id() == id { - err = ErrDeleteSelf - return - } - if s.state.Exists(id) { - err = ErrAlreadyDeleted - return nil - } - _, err = s.store.TreeStorage(id) - if err != nil { - err = ErrObjDoesNotExist - return - } - isSnapshot := DoSnapshot(s.Len()) - res, err := s.changeFactory.CreateObjectDeleteChange(id, s.state, isSnapshot) - if err != nil { - return - } - - return s.addContent(res, isSnapshot) -} - -func (s *settingsObject) verifyDeleteSpace(raw *treechangeproto.RawTreeChangeWithId) (err error) { - data, err := s.UnpackChange(raw) - if err != nil { - return - } - return verifyDeleteContent(data, "") -} - -func (s *settingsObject) addContent(data []byte, isSnapshot bool) (err error) { - accountData := s.account.Account() - res, err := s.AddContent(context.Background(), objecttree.SignableChangeContent{ - Data: data, - Key: accountData.SignKey, - IsSnapshot: isSnapshot, - IsEncrypted: false, - }) - if err != nil { - return - } - if res.Mode == objecttree.Rebuild { - s.Rebuild(s) - } else { - s.Update(s) - } - return -} - -func VerifyDeleteChange(raw *treechangeproto.RawTreeChangeWithId, identity crypto.PubKey, peerId string) (err error) { - changeBuilder := objecttree.NewChangeBuilder(crypto.NewKeyStorage(), nil) - res, err := changeBuilder.Unmarshall(raw, true) - if err != nil { - return - } - if !res.Identity.Equals(identity) { - return fmt.Errorf("incorrect identity") - } - return verifyDeleteContent(res.Data, peerId) -} - -func verifyDeleteContent(data []byte, peerId string) (err error) { - content := &spacesyncproto.SettingsData{} - err = proto.Unmarshal(data, content) - if err != nil { - return - } - if len(content.GetContent()) != 1 || - content.GetContent()[0].GetSpaceDelete() == nil || - (peerId == "" && content.GetContent()[0].GetSpaceDelete().GetDeleterPeerId() == "") || - (peerId != "" && content.GetContent()[0].GetSpaceDelete().GetDeleterPeerId() != peerId) { - return fmt.Errorf("incorrect delete change payload") - } - return +func (s *settings) Name() (name string) { + return CName } diff --git a/commonspace/settings/settingsobject.go b/commonspace/settings/settingsobject.go new file mode 100644 index 00000000..e6fd0b39 --- /dev/null +++ b/commonspace/settings/settingsobject.go @@ -0,0 +1,329 @@ +//go:generate mockgen -destination mock_settings/mock_settings.go github.com/anyproto/any-sync/commonspace/settings DeletionManager,Deleter,SpaceIdsProvider +package settings + +import ( + "context" + "errors" + "fmt" + "github.com/anyproto/any-sync/commonspace/deletionstate" + "github.com/anyproto/any-sync/util/crypto" + + "github.com/anyproto/any-sync/accountservice" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" + "github.com/anyproto/any-sync/commonspace/object/tree/synctree" + "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anyproto/any-sync/commonspace/object/treemanager" + "github.com/anyproto/any-sync/commonspace/settings/settingsstate" + "github.com/anyproto/any-sync/commonspace/spacestorage" + "github.com/anyproto/any-sync/commonspace/spacesyncproto" + "github.com/anyproto/any-sync/nodeconf" + "github.com/gogo/protobuf/proto" + "go.uber.org/zap" + "golang.org/x/exp/slices" +) + +var log = logger.NewNamed("common.commonspace.settings") + +type SettingsObject interface { + synctree.SyncTree + Init(ctx context.Context) (err error) + DeleteObject(id string) (err error) + DeleteSpace(ctx context.Context, raw *treechangeproto.RawTreeChangeWithId) (err error) + SpaceDeleteRawChange() (raw *treechangeproto.RawTreeChangeWithId, err error) +} + +var ( + ErrDeleteSelf = errors.New("cannot delete self") + ErrAlreadyDeleted = errors.New("the object is already deleted") + ErrObjDoesNotExist = errors.New("the object does not exist") + ErrCantDeleteSpace = errors.New("not able to delete space") +) + +var ( + DoSnapshot = objecttree.DoSnapshot + buildHistoryTree = func(objTree objecttree.ObjectTree) (objecttree.ReadableObjectTree, error) { + return objecttree.BuildHistoryTree(objecttree.HistoryTreeParams{ + TreeStorage: objTree.Storage(), + AclList: objTree.AclList(), + BuildFullTree: true, + }) + } +) + +type BuildTreeFunc func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) + +type Deps struct { + BuildFunc BuildTreeFunc + Account accountservice.Service + TreeManager treemanager.TreeManager + Store spacestorage.SpaceStorage + Configuration nodeconf.NodeConf + DeletionState deletionstate.ObjectDeletionState + Provider SpaceIdsProvider + OnSpaceDelete func() + // testing dependencies + builder settingsstate.StateBuilder + del Deleter + delManager DeletionManager + changeFactory settingsstate.ChangeFactory +} + +type settingsObject struct { + synctree.SyncTree + account accountservice.Service + spaceId string + treeManager treemanager.TreeManager + store spacestorage.SpaceStorage + builder settingsstate.StateBuilder + buildFunc BuildTreeFunc + loop *deleteLoop + + state *settingsstate.State + deletionState deletionstate.ObjectDeletionState + deletionManager DeletionManager + changeFactory settingsstate.ChangeFactory +} + +func NewSettingsObject(deps Deps, spaceId string) (obj SettingsObject) { + var ( + deleter Deleter + deletionManager DeletionManager + builder settingsstate.StateBuilder + changeFactory settingsstate.ChangeFactory + ) + if deps.del == nil { + deleter = newDeleter(deps.Store, deps.DeletionState, deps.TreeManager) + } else { + deleter = deps.del + } + if deps.delManager == nil { + deletionManager = newDeletionManager( + spaceId, + deps.Store.SpaceSettingsId(), + deps.Configuration.IsResponsible(spaceId), + deps.TreeManager, + deps.DeletionState, + deps.Provider, + deps.OnSpaceDelete) + } else { + deletionManager = deps.delManager + } + if deps.builder == nil { + builder = settingsstate.NewStateBuilder() + } else { + builder = deps.builder + } + if deps.changeFactory == nil { + changeFactory = settingsstate.NewChangeFactory() + } else { + changeFactory = deps.changeFactory + } + + loop := newDeleteLoop(func() { + deleter.Delete() + }) + deps.DeletionState.AddObserver(func(ids []string) { + loop.notify() + }) + + s := &settingsObject{ + loop: loop, + spaceId: spaceId, + account: deps.Account, + deletionState: deps.DeletionState, + treeManager: deps.TreeManager, + store: deps.Store, + buildFunc: deps.BuildFunc, + builder: builder, + deletionManager: deletionManager, + changeFactory: changeFactory, + } + obj = s + return +} + +func (s *settingsObject) updateIds(tr objecttree.ObjectTree) { + var err error + s.state, err = s.builder.Build(tr, s.state) + if err != nil { + log.Error("failed to build state", zap.Error(err)) + return + } + log.Debug("updating object state", zap.String("deleted by", s.state.DeleterId)) + if err = s.deletionManager.UpdateState(context.Background(), s.state); err != nil { + log.Error("failed to update state", zap.Error(err)) + } +} + +// Update is called as part of UpdateListener interface +func (s *settingsObject) Update(tr objecttree.ObjectTree) { + s.updateIds(tr) +} + +// Rebuild is called as part of UpdateListener interface (including when the object is built for the first time, e.g. on Init call) +func (s *settingsObject) Rebuild(tr objecttree.ObjectTree) { + // at initial build "s" may not contain the object tree, so it is safer to provide it from the function parameter + s.state = nil + s.updateIds(tr) +} + +func (s *settingsObject) Init(ctx context.Context) (err error) { + settingsId := s.store.SpaceSettingsId() + log.Debug("space settings id", zap.String("id", settingsId)) + s.SyncTree, err = s.buildFunc(ctx, settingsId, s) + if err != nil { + return + } + // TODO: remove this check when everybody updates + if err = s.checkHistoryState(ctx); err != nil { + return + } + s.loop.Run() + return +} + +func (s *settingsObject) checkHistoryState(ctx context.Context) (err error) { + historyTree, err := buildHistoryTree(s.SyncTree) + if err != nil { + return + } + fullState, err := s.builder.Build(historyTree, nil) + if err != nil { + return + } + if len(fullState.DeletedIds) != len(s.state.DeletedIds) { + log.WarnCtx(ctx, "state does not have all deleted ids", + zap.Int("fullstate ids", len(fullState.DeletedIds)), + zap.Int("state ids", len(fullState.DeletedIds))) + s.state = fullState + err = s.deletionManager.UpdateState(context.Background(), s.state) + if err != nil { + return + } + } + return +} + +func (s *settingsObject) Close() error { + s.loop.Close() + return s.SyncTree.Close() +} + +func (s *settingsObject) DeleteSpace(ctx context.Context, raw *treechangeproto.RawTreeChangeWithId) (err error) { + s.Lock() + defer s.Unlock() + defer func() { + log.Debug("finished adding delete change", zap.Error(err)) + }() + err = s.verifyDeleteSpace(raw) + if err != nil { + return + } + res, err := s.AddRawChanges(ctx, objecttree.RawChangesPayload{ + NewHeads: []string{raw.Id}, + RawChanges: []*treechangeproto.RawTreeChangeWithId{raw}, + }) + if err != nil { + return + } + if !slices.Contains(res.Heads, raw.Id) { + err = ErrCantDeleteSpace + return + } + return +} + +func (s *settingsObject) SpaceDeleteRawChange() (raw *treechangeproto.RawTreeChangeWithId, err error) { + accountData := s.account.Account() + data, err := s.changeFactory.CreateSpaceDeleteChange(accountData.PeerId, s.state, false) + if err != nil { + return + } + return s.PrepareChange(objecttree.SignableChangeContent{ + Data: data, + Key: accountData.SignKey, + IsSnapshot: false, + IsEncrypted: false, + }) +} + +func (s *settingsObject) DeleteObject(id string) (err error) { + s.Lock() + defer s.Unlock() + if s.Id() == id { + err = ErrDeleteSelf + return + } + if s.state.Exists(id) { + err = ErrAlreadyDeleted + return nil + } + _, err = s.store.TreeStorage(id) + if err != nil { + err = ErrObjDoesNotExist + return + } + isSnapshot := DoSnapshot(s.Len()) + res, err := s.changeFactory.CreateObjectDeleteChange(id, s.state, isSnapshot) + if err != nil { + return + } + + return s.addContent(res, isSnapshot) +} + +func (s *settingsObject) verifyDeleteSpace(raw *treechangeproto.RawTreeChangeWithId) (err error) { + data, err := s.UnpackChange(raw) + if err != nil { + return + } + return verifyDeleteContent(data, "") +} + +func (s *settingsObject) addContent(data []byte, isSnapshot bool) (err error) { + accountData := s.account.Account() + res, err := s.AddContent(context.Background(), objecttree.SignableChangeContent{ + Data: data, + Key: accountData.SignKey, + IsSnapshot: isSnapshot, + IsEncrypted: false, + }) + if err != nil { + return + } + if res.Mode == objecttree.Rebuild { + s.Rebuild(s) + } else { + s.Update(s) + } + return +} + +func VerifyDeleteChange(raw *treechangeproto.RawTreeChangeWithId, identity crypto.PubKey, peerId string) (err error) { + changeBuilder := objecttree.NewChangeBuilder(crypto.NewKeyStorage(), nil) + res, err := changeBuilder.Unmarshall(raw, true) + if err != nil { + return + } + if !res.Identity.Equals(identity) { + return fmt.Errorf("incorrect identity") + } + return verifyDeleteContent(res.Data, peerId) +} + +func verifyDeleteContent(data []byte, peerId string) (err error) { + content := &spacesyncproto.SettingsData{} + err = proto.Unmarshal(data, content) + if err != nil { + return + } + if len(content.GetContent()) != 1 || + content.GetContent()[0].GetSpaceDelete() == nil || + (peerId == "" && content.GetContent()[0].GetSpaceDelete().GetDeleterPeerId() == "") || + (peerId != "" && content.GetContent()[0].GetSpaceDelete().GetDeleterPeerId() != peerId) { + return fmt.Errorf("incorrect delete change payload") + } + return +} diff --git a/commonspace/settings/settings_test.go b/commonspace/settings/settingsobject_test.go similarity index 100% rename from commonspace/settings/settings_test.go rename to commonspace/settings/settingsobject_test.go diff --git a/commonspace/settings/settingsstate/settingsstate.go b/commonspace/settings/settingsstate/settingsstate.go index 0b62c8d1..a6c8b0b6 100644 --- a/commonspace/settings/settingsstate/settingsstate.go +++ b/commonspace/settings/settingsstate/settingsstate.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination mock_settingsstate/mock_settingsstate.go github.com/anyproto/any-sync/commonspace/settings/settingsstate ObjectDeletionState,StateBuilder,ChangeFactory package settingsstate import "github.com/anyproto/any-sync/commonspace/spacesyncproto" diff --git a/commonspace/space.go b/commonspace/space.go index 70bf6e0c..d8574acf 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -130,7 +130,7 @@ type space struct { headSync headsync.HeadSync syncStatus syncstatus.StatusUpdater storage spacestorage.SpaceStorage - treeManager *commonGetter + treeManager *objectManager account accountservice.Service aclList *syncacl.SyncAcl configuration nodeconf.NodeConf diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 0e770dc1..3657d423 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -5,6 +5,7 @@ import ( "github.com/anyproto/any-sync/accountservice" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/config" "github.com/anyproto/any-sync/commonspace/credentialprovider" "github.com/anyproto/any-sync/commonspace/headsync" "github.com/anyproto/any-sync/commonspace/object/acl/aclrecordproto" @@ -45,7 +46,7 @@ type SpaceService interface { } type spaceService struct { - config Config + config config.Config account accountservice.Service configurationService nodeconf.Service storageProvider spacestorage.SpaceStorageProvider @@ -54,10 +55,11 @@ type spaceService struct { treeManager treemanager.TreeManager pool pool.Pool metric metric.Metric + app *app.App } func (s *spaceService) Init(a *app.App) (err error) { - s.config = a.MustComponent("config").(ConfigGetter).GetSpace() + s.config = a.MustComponent("config").(config.ConfigGetter).GetSpace() s.account = a.MustComponent(accountservice.CName).(accountservice.Service) s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) @@ -149,7 +151,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { return nil, err } spaceIsDeleted.Swap(isDeleted) - getter := newCommonGetter(st.Id(), s.treeManager, spaceIsClosed) + getter := NewObjectManager(st.Id(), s.treeManager, spaceIsClosed) syncStatus := syncstatus.NewNoOpSyncStatus() // this will work only for clients, not the best solution, but... if !lastConfiguration.IsResponsible(st.Id()) { diff --git a/commonspace/spacestate/shareddata.go b/commonspace/spacestate/shareddata.go new file mode 100644 index 00000000..bf231cb7 --- /dev/null +++ b/commonspace/spacestate/shareddata.go @@ -0,0 +1,35 @@ +package spacestate + +import ( + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/object/acl/list" + "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" + "github.com/anyproto/any-sync/commonspace/spacestorage" + "sync/atomic" +) + +const CName = "common.commonspace.shareddata" + +type SpaceActions interface { + OnObjectDelete(id string) + OnSpaceDelete() +} + +type SpaceState struct { + SpaceId string + SpaceIsDeleted *atomic.Bool + SpaceIsClosed *atomic.Bool + TreesUsed *atomic.Int32 + AclList list.AclList + SpaceStorage spacestorage.SpaceStorage + TreeBuilderFunc objecttree.BuildObjectTreeFunc + Actions SpaceActions +} + +func (s *SpaceState) Init(a *app.App) (err error) { + return nil +} + +func (s *SpaceState) Name() (name string) { + return CName +} diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index 000d571c..1b345279 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -6,6 +6,7 @@ import ( accountService "github.com/anyproto/any-sync/accountservice" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/ocache" + "github.com/anyproto/any-sync/commonspace/config" "github.com/anyproto/any-sync/commonspace/credentialprovider" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/treemanager" @@ -205,8 +206,8 @@ func (m *mockConfig) Name() (name string) { return "config" } -func (m *mockConfig) GetSpace() Config { - return Config{ +func (m *mockConfig) GetSpace() config.Config { + return config.Config{ GCTTL: 60, SyncPeriod: 20, KeepTreeDataInMemory: true, diff --git a/commonspace/streamsender/streamsender.go b/commonspace/streamsender/streamsender.go new file mode 100644 index 00000000..9482a81a --- /dev/null +++ b/commonspace/streamsender/streamsender.go @@ -0,0 +1,14 @@ +package streamsender + +import ( + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/spacesyncproto" +) + +const CName = "common.commonspace.streamsender" + +type StreamSender interface { + app.ComponentRunnable + SendPeer(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) + Broadcast(msg *spacesyncproto.ObjectSyncMessage) (err error) +} diff --git a/commonspace/syncstatus/noop.go b/commonspace/syncstatus/noop.go index 586b90d2..10ef80ac 100644 --- a/commonspace/syncstatus/noop.go +++ b/commonspace/syncstatus/noop.go @@ -1,9 +1,32 @@ package syncstatus +import ( + "context" + "github.com/anyproto/any-sync/app" +) + +func NewNoOpSyncStatus() StatusProvider { + return &noOpSyncStatus{} +} + type noOpSyncStatus struct{} -func NewNoOpSyncStatus() StatusUpdater { - return &noOpSyncStatus{} +func (n *noOpSyncStatus) Init(a *app.App) (err error) { + return nil +} + +func (n *noOpSyncStatus) Name() (name string) { + return CName +} + +func (n *noOpSyncStatus) Watch(treeId string) (err error) { + return nil +} + +func (n *noOpSyncStatus) Unwatch(treeId string) { +} + +func (n *noOpSyncStatus) SetUpdateReceiver(updater UpdateReceiver) { } func (n *noOpSyncStatus) HeadsChange(treeId string, heads []string) { @@ -22,9 +45,10 @@ func (n *noOpSyncStatus) StateCounter() uint64 { func (n *noOpSyncStatus) RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) { } -func (n *noOpSyncStatus) Run() { -} - -func (n *noOpSyncStatus) Close() error { +func (n *noOpSyncStatus) Run(ctx context.Context) error { + return nil +} + +func (n *noOpSyncStatus) Close(ctx context.Context) error { return nil } diff --git a/commonspace/syncstatus/syncstatus.go b/commonspace/syncstatus/syncstatus.go index 91191573..edc12175 100644 --- a/commonspace/syncstatus/syncstatus.go +++ b/commonspace/syncstatus/syncstatus.go @@ -3,6 +3,8 @@ package syncstatus import ( "context" "fmt" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/spacestate" "sync" "time" @@ -20,7 +22,9 @@ const ( syncTimeout = time.Second ) -var log = logger.NewNamed("common.commonspace.syncstatus") +var log = logger.NewNamed(CName) + +const CName = "common.commonspace.syncstatus" type UpdateReceiver interface { UpdateTree(ctx context.Context, treeId string, status SyncStatus) (err error) @@ -34,9 +38,6 @@ type StatusUpdater interface { SetNodesOnline(senderId string, online bool) StateCounter() uint64 RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64) - - Run() - Close() error } type StatusWatcher interface { @@ -46,6 +47,7 @@ type StatusWatcher interface { } type StatusProvider interface { + app.ComponentRunnable StatusUpdater StatusWatcher } @@ -89,35 +91,27 @@ type syncStatusProvider struct { updateTimeout time.Duration } -type SyncStatusDeps struct { - UpdateIntervalSecs int - UpdateTimeout time.Duration - Configuration nodeconf.NodeConf - Storage spacestorage.SpaceStorage -} - -func DefaultDeps(configuration nodeconf.NodeConf, store spacestorage.SpaceStorage) SyncStatusDeps { - return SyncStatusDeps{ - UpdateIntervalSecs: syncUpdateInterval, - UpdateTimeout: syncTimeout, - Configuration: configuration, - Storage: store, - } -} - -func NewSyncStatusProvider(spaceId string, deps SyncStatusDeps) StatusProvider { +func NewSyncStatusProvider() StatusProvider { return &syncStatusProvider{ - spaceId: spaceId, - treeHeads: map[string]treeHeadsEntry{}, - watchers: map[string]struct{}{}, - updateIntervalSecs: deps.UpdateIntervalSecs, - updateTimeout: deps.UpdateTimeout, - configuration: deps.Configuration, - storage: deps.Storage, - stateCounter: 0, + treeHeads: map[string]treeHeadsEntry{}, + watchers: map[string]struct{}{}, } } +func (s *syncStatusProvider) Init(a *app.App) (err error) { + sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) + s.updateIntervalSecs = syncUpdateInterval + s.updateTimeout = syncTimeout + s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) + s.storage = sharedState.SpaceStorage + s.spaceId = sharedState.SpaceId + return +} + +func (s *syncStatusProvider) Name() (name string) { + return CName +} + func (s *syncStatusProvider) SetUpdateReceiver(updater UpdateReceiver) { s.Lock() defer s.Unlock() @@ -125,13 +119,14 @@ func (s *syncStatusProvider) SetUpdateReceiver(updater UpdateReceiver) { s.updateReceiver = updater } -func (s *syncStatusProvider) Run() { +func (s *syncStatusProvider) Run(ctx context.Context) error { s.periodicSync = periodicsync.NewPeriodicSync( s.updateIntervalSecs, s.updateTimeout, s.update, log) s.periodicSync.Run() + return nil } func (s *syncStatusProvider) HeadsChange(treeId string, heads []string) { @@ -257,11 +252,6 @@ func (s *syncStatusProvider) Unwatch(treeId string) { } } -func (s *syncStatusProvider) Close() (err error) { - s.periodicSync.Close() - return -} - func (s *syncStatusProvider) StateCounter() uint64 { s.Lock() defer s.Unlock() @@ -292,6 +282,11 @@ func (s *syncStatusProvider) RemoveAllExcept(senderId string, differentRemoteIds } } +func (s *syncStatusProvider) Close(ctx context.Context) error { + s.periodicSync.Close() + return nil +} + func (s *syncStatusProvider) isSenderResponsible(senderId string) bool { return slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) }