From 0d50db243bd7639c5975ec1a24b22c35cd77e99a Mon Sep 17 00:00:00 2001 From: AnastasiaShemyakinskaya Date: Tue, 2 Jul 2024 13:23:05 +0200 Subject: [PATCH] revert object sync status events Signed-off-by: AnastasiaShemyakinskaya --- core/syncstatus/filestatus.go | 6 +- .../syncstatus/objectsyncstatus/syncstatus.go | 27 +- core/syncstatus/service.go | 13 + core/syncstatus/updatereceiver.go | 125 ++++++++ core/syncstatus/updatereceiver_test.go | 270 ++++++++++++++++++ 5 files changed, 436 insertions(+), 5 deletions(-) create mode 100644 core/syncstatus/updatereceiver.go create mode 100644 core/syncstatus/updatereceiver_test.go diff --git a/core/syncstatus/filestatus.go b/core/syncstatus/filestatus.go index f24ed1208..fd75d6781 100644 --- a/core/syncstatus/filestatus.go +++ b/core/syncstatus/filestatus.go @@ -1,6 +1,7 @@ package syncstatus import ( + "context" "fmt" "time" @@ -51,7 +52,10 @@ func (s *service) indexFileSyncStatus(fileObjectId string, status filesyncstatus if err != nil { return fmt.Errorf("get object: %w", err) } - + err = s.updateReceiver.UpdateTree(context.Background(), fileObjectId, status.ToSyncStatus()) + if err != nil { + return fmt.Errorf("update tree: %w", err) + } s.sendSpaceStatusUpdate(status, spaceId, bytesLeftPercentage) return nil } diff --git a/core/syncstatus/objectsyncstatus/syncstatus.go b/core/syncstatus/objectsyncstatus/syncstatus.go index 0413ad76e..8f993067e 100644 --- a/core/syncstatus/objectsyncstatus/syncstatus.go +++ b/core/syncstatus/objectsyncstatus/syncstatus.go @@ -31,7 +31,8 @@ const ( var log = logger.NewNamed(syncstatus.CName) type UpdateReceiver interface { - UpdateTree(ctx context.Context, treeId string, status SyncStatus) + UpdateTree(ctx context.Context, treeId string, status SyncStatus) (err error) + UpdateNodeStatus() } type SyncStatus int @@ -51,6 +52,7 @@ type StatusUpdater interface { type StatusWatcher interface { Watch(treeId string) (err error) Unwatch(treeId string) + SetUpdateReceiver(updater UpdateReceiver) } type StatusService interface { @@ -77,9 +79,10 @@ type Updater interface { type syncStatusService struct { sync.Mutex - configuration nodeconf.NodeConf - periodicSync periodicsync.PeriodicSync - storage spacestorage.SpaceStorage + configuration nodeconf.NodeConf + periodicSync periodicsync.PeriodicSync + updateReceiver UpdateReceiver + storage spacestorage.SpaceStorage spaceId string treeHeads map[string]treeHeadsEntry @@ -127,6 +130,13 @@ func (s *syncStatusService) Name() (name string) { return syncstatus.CName } +func (s *syncStatusService) SetUpdateReceiver(updater UpdateReceiver) { + s.Lock() + defer s.Unlock() + + s.updateReceiver = updater +} + func (s *syncStatusService) Run(ctx context.Context) error { s.periodicSync.Run() return nil @@ -152,6 +162,10 @@ func (s *syncStatusService) update(ctx context.Context) (err error) { s.treeStatusBuf = s.treeStatusBuf[:0] s.Lock() + if s.updateReceiver == nil { + s.Unlock() + return + } for treeId := range s.watchers { // that means that we haven't yet got the status update treeHeads, exists := s.treeHeads[treeId] @@ -163,7 +177,12 @@ func (s *syncStatusService) update(ctx context.Context) (err error) { s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, treeHeads.syncStatus}) } s.Unlock() + s.updateReceiver.UpdateNodeStatus() for _, entry := range s.treeStatusBuf { + err = s.updateReceiver.UpdateTree(ctx, entry.treeId, entry.status) + if err != nil { + return + } s.updateDetails(entry.treeId, mapStatus(entry.status)) } return diff --git a/core/syncstatus/service.go b/core/syncstatus/service.go index 9e9a3e2bd..d816097a2 100644 --- a/core/syncstatus/service.go +++ b/core/syncstatus/service.go @@ -6,9 +6,13 @@ import ( "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace" + "github.com/anyproto/any-sync/nodeconf" + "github.com/anyproto/anytype-heart/core/anytype/config" "github.com/anyproto/anytype-heart/core/block/cache" + "github.com/anyproto/anytype-heart/core/event" "github.com/anyproto/anytype-heart/core/filestorage/filesync" + "github.com/anyproto/anytype-heart/core/syncstatus/nodestatus" "github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus" "github.com/anyproto/anytype-heart/core/syncstatus/spacesyncstatus" "github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore" @@ -27,6 +31,7 @@ type Service interface { var _ Service = (*service)(nil) type service struct { + updateReceiver *updateReceiver fileSyncService filesync.FileSync objectWatchersLock sync.Mutex @@ -53,6 +58,12 @@ func (s *service) Init(a *app.App) (err error) { s.fileSyncService.OnUploadStarted(s.onFileUploadStarted) s.fileSyncService.OnLimited(s.onFileLimited) s.fileSyncService.OnDelete(s.OnFileDelete) + + nodeConfService := app.MustComponent[nodeconf.Service](a) + cfg := app.MustComponent[*config.Config](a) + eventSender := app.MustComponent[event.Sender](a) + nodeStatus := app.MustComponent[nodestatus.NodeStatus](a) + s.updateReceiver = newUpdateReceiver(nodeConfService, cfg, eventSender, s.objectStore, nodeStatus) return nil } @@ -68,7 +79,9 @@ func (s *service) RegisterSpace(space commonspace.Space, sw objectsyncstatus.Sta s.objectWatchersLock.Lock() defer s.objectWatchersLock.Unlock() + sw.SetUpdateReceiver(s.updateReceiver) s.objectWatchers[space.Id()] = sw + s.updateReceiver.spaceId = space.Id() } func (s *service) UnregisterSpace(space commonspace.Space) { diff --git a/core/syncstatus/updatereceiver.go b/core/syncstatus/updatereceiver.go new file mode 100644 index 000000000..0175c4b16 --- /dev/null +++ b/core/syncstatus/updatereceiver.go @@ -0,0 +1,125 @@ +package syncstatus + +import ( + "context" + "fmt" + "sync" + + "github.com/anyproto/any-sync/nodeconf" + + "github.com/anyproto/anytype-heart/core/anytype/config" + "github.com/anyproto/anytype-heart/core/event" + "github.com/anyproto/anytype-heart/core/syncstatus/filesyncstatus" + "github.com/anyproto/anytype-heart/core/syncstatus/nodestatus" + "github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus" + "github.com/anyproto/anytype-heart/pb" + "github.com/anyproto/anytype-heart/pkg/lib/bundle" + "github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore" +) + +type updateReceiver struct { + eventSender event.Sender + + nodeConfService nodeconf.Service + sync.Mutex + nodeConnected bool + objectStore objectstore.ObjectStore + nodeStatus nodestatus.NodeStatus + spaceId string +} + +func newUpdateReceiver( + nodeConfService nodeconf.Service, + cfg *config.Config, + eventSender event.Sender, + objectStore objectstore.ObjectStore, + nodeStatus nodestatus.NodeStatus, +) *updateReceiver { + if cfg.DisableThreadsSyncEvents { + eventSender = nil + } + return &updateReceiver{ + nodeConfService: nodeConfService, + eventSender: eventSender, + objectStore: objectStore, + nodeStatus: nodeStatus, + } +} + +func (r *updateReceiver) UpdateTree(_ context.Context, objId string, status objectsyncstatus.SyncStatus) error { + objStatusEvent := r.getObjectSyncStatus(objId, status) + r.notify(objId, objStatusEvent) + return nil +} + +func (r *updateReceiver) getFileStatus(fileId string) (filesyncstatus.Status, error) { + details, err := r.objectStore.GetDetails(fileId) + if err != nil { + return filesyncstatus.Unknown, fmt.Errorf("get file details: %w", err) + } + if v, ok := details.GetDetails().GetFields()[bundle.RelationKeyFileBackupStatus.String()]; ok { + return filesyncstatus.Status(v.GetNumberValue()), nil + } + return filesyncstatus.Unknown, fmt.Errorf("no backup status") +} + +func (r *updateReceiver) getObjectSyncStatus(objectId string, status objectsyncstatus.SyncStatus) pb.EventStatusThreadSyncStatus { + fileStatus, err := r.getFileStatus(objectId) + if err == nil { + // Prefer file backup status + if fileStatus != filesyncstatus.Synced { + status = fileStatus.ToSyncStatus() + } + } + + if r.nodeConfService.NetworkCompatibilityStatus() == nodeconf.NetworkCompatibilityStatusIncompatible { + return pb.EventStatusThread_IncompatibleVersion + } + + if !r.isNodeConnected() { + return pb.EventStatusThread_Offline + } + + switch status { + case objectsyncstatus.StatusUnknown: + return pb.EventStatusThread_Unknown + case objectsyncstatus.StatusSynced: + return pb.EventStatusThread_Synced + } + return pb.EventStatusThread_Syncing +} + +func (r *updateReceiver) isNodeConnected() bool { + r.Lock() + defer r.Unlock() + return r.nodeConnected +} + +func (r *updateReceiver) UpdateNodeStatus() { + r.Lock() + defer r.Unlock() + r.nodeConnected = r.nodeStatus.GetNodeStatus(r.spaceId) == nodestatus.Online +} + +func (r *updateReceiver) notify( + objId string, + objStatus pb.EventStatusThreadSyncStatus, +) { + r.sendEvent(objId, &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{ + Summary: &pb.EventStatusThreadSummary{Status: objStatus}, + Cafe: &pb.EventStatusThreadCafe{ + Status: objStatus, + Files: &pb.EventStatusThreadCafePinStatus{}, + }, + }}) +} + +func (r *updateReceiver) sendEvent(ctx string, event pb.IsEventMessageValue) { + if r.eventSender == nil { + return + } + r.eventSender.Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{{Value: event}}, + ContextId: ctx, + }) +} diff --git a/core/syncstatus/updatereceiver_test.go b/core/syncstatus/updatereceiver_test.go new file mode 100644 index 000000000..a37701135 --- /dev/null +++ b/core/syncstatus/updatereceiver_test.go @@ -0,0 +1,270 @@ +package syncstatus + +import ( + "testing" + + "github.com/anyproto/any-sync/nodeconf" + "github.com/anyproto/any-sync/nodeconf/mock_nodeconf" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + "github.com/anyproto/anytype-heart/core/anytype/config" + "github.com/anyproto/anytype-heart/core/event/mock_event" + "github.com/anyproto/anytype-heart/core/syncstatus/filesyncstatus" + "github.com/anyproto/anytype-heart/core/syncstatus/nodestatus" + "github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus" + "github.com/anyproto/anytype-heart/pb" + "github.com/anyproto/anytype-heart/pkg/lib/bundle" + "github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore" + "github.com/anyproto/anytype-heart/util/pbtypes" +) + +func TestUpdateReceiver_UpdateTree(t *testing.T) { + t.Run("update to sync status", func(t *testing.T) { + // given + receiver := newFixture(t) + receiver.nodeConnected = true + receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk) + receiver.sender.EXPECT().Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{ + Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Synced}, + Cafe: &pb.EventStatusThreadCafe{ + Status: pb.EventStatusThread_Synced, + Files: &pb.EventStatusThreadCafePinStatus{}, + }, + }}}}, + ContextId: "id", + }).Return() + + // when + err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced) + + // then + assert.Nil(t, err) + }) + t.Run("network incompatible", func(t *testing.T) { + // given + receiver := newFixture(t) + receiver.nodeConnected = true + receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusIncompatible) + receiver.sender.EXPECT().Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{ + Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_IncompatibleVersion}, + Cafe: &pb.EventStatusThreadCafe{ + Status: pb.EventStatusThread_IncompatibleVersion, + Files: &pb.EventStatusThreadCafePinStatus{}, + }, + }}}}, + ContextId: "id", + }).Return() + + // when + err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusNotSynced) + + // then + assert.Nil(t, err) + }) + t.Run("file storage limited", func(t *testing.T) { + // given + receiver := newFixture(t) + receiver.nodeConnected = true + receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk) + receiver.sender.EXPECT().Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{ + Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Syncing}, + Cafe: &pb.EventStatusThreadCafe{ + Status: pb.EventStatusThread_Syncing, + Files: &pb.EventStatusThreadCafePinStatus{}, + }, + }}}}, + ContextId: "id", + }).Return() + + receiver.store.AddObjects(t, []objectstore.TestObject{ + { + bundle.RelationKeyId: pbtypes.String("id"), + bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(filesyncstatus.Limited)), + }, + }) + + // when + err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusNotSynced) + + // then + assert.Nil(t, err) + }) + t.Run("object sync status - syncing", func(t *testing.T) { + // given + receiver := newFixture(t) + receiver.nodeConnected = true + receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk) + receiver.sender.EXPECT().Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{ + Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Syncing}, + Cafe: &pb.EventStatusThreadCafe{ + Status: pb.EventStatusThread_Syncing, + Files: &pb.EventStatusThreadCafePinStatus{}, + }, + }}}}, + ContextId: "id", + }).Return() + + // when + err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusNotSynced) + + // then + assert.Nil(t, err) + }) + t.Run("object sync status - unknown", func(t *testing.T) { + // given + receiver := newFixture(t) + receiver.nodeConnected = true + receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk) + receiver.sender.EXPECT().Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{ + Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Unknown}, + Cafe: &pb.EventStatusThreadCafe{ + Status: pb.EventStatusThread_Unknown, + Files: &pb.EventStatusThreadCafePinStatus{}, + }, + }}}}, + ContextId: "id", + }).Return() + + // when + err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusUnknown) + + // then + assert.Nil(t, err) + }) + t.Run("object sync status - connection error", func(t *testing.T) { + // given + receiver := newFixture(t) + receiver.nodeConnected = false + receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk) + receiver.sender.EXPECT().Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{ + Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Offline}, + Cafe: &pb.EventStatusThreadCafe{ + Status: pb.EventStatusThread_Offline, + Files: &pb.EventStatusThreadCafePinStatus{}, + }, + }}}}, + ContextId: "id", + }).Return() + + // when + err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced) + + // then + assert.Nil(t, err) + }) + t.Run("file sync status - synced", func(t *testing.T) { + // given + receiver := newFixture(t) + receiver.nodeConnected = true + receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk) + receiver.sender.EXPECT().Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{ + Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Synced}, + Cafe: &pb.EventStatusThreadCafe{ + Status: pb.EventStatusThread_Synced, + Files: &pb.EventStatusThreadCafePinStatus{}, + }, + }}}}, + ContextId: "id", + }).Return() + + receiver.store.AddObjects(t, []objectstore.TestObject{ + { + bundle.RelationKeyId: pbtypes.String("id"), + bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(filesyncstatus.Synced)), + }, + }) + + // when + err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced) + + // then + assert.Nil(t, err) + }) + t.Run("file sync status - syncing", func(t *testing.T) { + // given + receiver := newFixture(t) + receiver.nodeConnected = true + receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk) + receiver.sender.EXPECT().Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{ + Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Syncing}, + Cafe: &pb.EventStatusThreadCafe{ + Status: pb.EventStatusThread_Syncing, + Files: &pb.EventStatusThreadCafePinStatus{}, + }, + }}}}, + ContextId: "id", + }).Return() + + receiver.store.AddObjects(t, []objectstore.TestObject{ + { + bundle.RelationKeyId: pbtypes.String("id"), + bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(filesyncstatus.Syncing)), + }, + }) + + // when + err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusUnknown) + + // then + assert.Nil(t, err) + }) + t.Run("sync status not changed", func(t *testing.T) { + // given + receiver := newFixture(t) + receiver.nodeConnected = true + receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk).Times(2) + receiver.sender.EXPECT().Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{ + Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Synced}, + Cafe: &pb.EventStatusThreadCafe{ + Status: pb.EventStatusThread_Synced, + Files: &pb.EventStatusThreadCafePinStatus{}, + }, + }}}}, + ContextId: "id", + }).Return().Times(1) + + // when + err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced) + assert.Nil(t, err) + + err = receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced) + assert.Nil(t, err) + + // then + assert.Nil(t, err) + }) +} + +func newFixture(t *testing.T) *fixture { + ctrl := gomock.NewController(t) + nodeConf := mock_nodeconf.NewMockService(ctrl) + conf := &config.Config{} + sender := mock_event.NewMockSender(t) + storeFixture := objectstore.NewStoreFixture(t) + status := nodestatus.NewNodeStatus() + + receiver := newUpdateReceiver(nodeConf, conf, sender, storeFixture, status) + return &fixture{ + updateReceiver: receiver, + sender: sender, + nodeConf: nodeConf, + store: storeFixture, + } +} + +type fixture struct { + *updateReceiver + sender *mock_event.MockSender + nodeConf *mock_nodeconf.MockService + store *objectstore.StoreFixture +}