1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-10 01:51:07 +09:00

revert object sync status events

Signed-off-by: AnastasiaShemyakinskaya <shem98a@mail.ru>
This commit is contained in:
AnastasiaShemyakinskaya 2024-07-02 13:23:05 +02:00
parent 21573dc9ad
commit 0d50db243b
No known key found for this signature in database
GPG key ID: CCD60ED83B103281
5 changed files with 436 additions and 5 deletions

View file

@ -1,6 +1,7 @@
package syncstatus
import (
"context"
"fmt"
"time"
@ -51,7 +52,10 @@ func (s *service) indexFileSyncStatus(fileObjectId string, status filesyncstatus
if err != nil {
return fmt.Errorf("get object: %w", err)
}
err = s.updateReceiver.UpdateTree(context.Background(), fileObjectId, status.ToSyncStatus())
if err != nil {
return fmt.Errorf("update tree: %w", err)
}
s.sendSpaceStatusUpdate(status, spaceId, bytesLeftPercentage)
return nil
}

View file

@ -31,7 +31,8 @@ const (
var log = logger.NewNamed(syncstatus.CName)
type UpdateReceiver interface {
UpdateTree(ctx context.Context, treeId string, status SyncStatus)
UpdateTree(ctx context.Context, treeId string, status SyncStatus) (err error)
UpdateNodeStatus()
}
type SyncStatus int
@ -51,6 +52,7 @@ type StatusUpdater interface {
type StatusWatcher interface {
Watch(treeId string) (err error)
Unwatch(treeId string)
SetUpdateReceiver(updater UpdateReceiver)
}
type StatusService interface {
@ -77,9 +79,10 @@ type Updater interface {
type syncStatusService struct {
sync.Mutex
configuration nodeconf.NodeConf
periodicSync periodicsync.PeriodicSync
storage spacestorage.SpaceStorage
configuration nodeconf.NodeConf
periodicSync periodicsync.PeriodicSync
updateReceiver UpdateReceiver
storage spacestorage.SpaceStorage
spaceId string
treeHeads map[string]treeHeadsEntry
@ -127,6 +130,13 @@ func (s *syncStatusService) Name() (name string) {
return syncstatus.CName
}
func (s *syncStatusService) SetUpdateReceiver(updater UpdateReceiver) {
s.Lock()
defer s.Unlock()
s.updateReceiver = updater
}
func (s *syncStatusService) Run(ctx context.Context) error {
s.periodicSync.Run()
return nil
@ -152,6 +162,10 @@ func (s *syncStatusService) update(ctx context.Context) (err error) {
s.treeStatusBuf = s.treeStatusBuf[:0]
s.Lock()
if s.updateReceiver == nil {
s.Unlock()
return
}
for treeId := range s.watchers {
// that means that we haven't yet got the status update
treeHeads, exists := s.treeHeads[treeId]
@ -163,7 +177,12 @@ func (s *syncStatusService) update(ctx context.Context) (err error) {
s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, treeHeads.syncStatus})
}
s.Unlock()
s.updateReceiver.UpdateNodeStatus()
for _, entry := range s.treeStatusBuf {
err = s.updateReceiver.UpdateTree(ctx, entry.treeId, entry.status)
if err != nil {
return
}
s.updateDetails(entry.treeId, mapStatus(entry.status))
}
return

View file

@ -6,9 +6,13 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/anytype-heart/core/anytype/config"
"github.com/anyproto/anytype-heart/core/block/cache"
"github.com/anyproto/anytype-heart/core/event"
"github.com/anyproto/anytype-heart/core/filestorage/filesync"
"github.com/anyproto/anytype-heart/core/syncstatus/nodestatus"
"github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus"
"github.com/anyproto/anytype-heart/core/syncstatus/spacesyncstatus"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
@ -27,6 +31,7 @@ type Service interface {
var _ Service = (*service)(nil)
type service struct {
updateReceiver *updateReceiver
fileSyncService filesync.FileSync
objectWatchersLock sync.Mutex
@ -53,6 +58,12 @@ func (s *service) Init(a *app.App) (err error) {
s.fileSyncService.OnUploadStarted(s.onFileUploadStarted)
s.fileSyncService.OnLimited(s.onFileLimited)
s.fileSyncService.OnDelete(s.OnFileDelete)
nodeConfService := app.MustComponent[nodeconf.Service](a)
cfg := app.MustComponent[*config.Config](a)
eventSender := app.MustComponent[event.Sender](a)
nodeStatus := app.MustComponent[nodestatus.NodeStatus](a)
s.updateReceiver = newUpdateReceiver(nodeConfService, cfg, eventSender, s.objectStore, nodeStatus)
return nil
}
@ -68,7 +79,9 @@ func (s *service) RegisterSpace(space commonspace.Space, sw objectsyncstatus.Sta
s.objectWatchersLock.Lock()
defer s.objectWatchersLock.Unlock()
sw.SetUpdateReceiver(s.updateReceiver)
s.objectWatchers[space.Id()] = sw
s.updateReceiver.spaceId = space.Id()
}
func (s *service) UnregisterSpace(space commonspace.Space) {

View file

@ -0,0 +1,125 @@
package syncstatus
import (
"context"
"fmt"
"sync"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/anytype-heart/core/anytype/config"
"github.com/anyproto/anytype-heart/core/event"
"github.com/anyproto/anytype-heart/core/syncstatus/filesyncstatus"
"github.com/anyproto/anytype-heart/core/syncstatus/nodestatus"
"github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
)
type updateReceiver struct {
eventSender event.Sender
nodeConfService nodeconf.Service
sync.Mutex
nodeConnected bool
objectStore objectstore.ObjectStore
nodeStatus nodestatus.NodeStatus
spaceId string
}
func newUpdateReceiver(
nodeConfService nodeconf.Service,
cfg *config.Config,
eventSender event.Sender,
objectStore objectstore.ObjectStore,
nodeStatus nodestatus.NodeStatus,
) *updateReceiver {
if cfg.DisableThreadsSyncEvents {
eventSender = nil
}
return &updateReceiver{
nodeConfService: nodeConfService,
eventSender: eventSender,
objectStore: objectStore,
nodeStatus: nodeStatus,
}
}
func (r *updateReceiver) UpdateTree(_ context.Context, objId string, status objectsyncstatus.SyncStatus) error {
objStatusEvent := r.getObjectSyncStatus(objId, status)
r.notify(objId, objStatusEvent)
return nil
}
func (r *updateReceiver) getFileStatus(fileId string) (filesyncstatus.Status, error) {
details, err := r.objectStore.GetDetails(fileId)
if err != nil {
return filesyncstatus.Unknown, fmt.Errorf("get file details: %w", err)
}
if v, ok := details.GetDetails().GetFields()[bundle.RelationKeyFileBackupStatus.String()]; ok {
return filesyncstatus.Status(v.GetNumberValue()), nil
}
return filesyncstatus.Unknown, fmt.Errorf("no backup status")
}
func (r *updateReceiver) getObjectSyncStatus(objectId string, status objectsyncstatus.SyncStatus) pb.EventStatusThreadSyncStatus {
fileStatus, err := r.getFileStatus(objectId)
if err == nil {
// Prefer file backup status
if fileStatus != filesyncstatus.Synced {
status = fileStatus.ToSyncStatus()
}
}
if r.nodeConfService.NetworkCompatibilityStatus() == nodeconf.NetworkCompatibilityStatusIncompatible {
return pb.EventStatusThread_IncompatibleVersion
}
if !r.isNodeConnected() {
return pb.EventStatusThread_Offline
}
switch status {
case objectsyncstatus.StatusUnknown:
return pb.EventStatusThread_Unknown
case objectsyncstatus.StatusSynced:
return pb.EventStatusThread_Synced
}
return pb.EventStatusThread_Syncing
}
func (r *updateReceiver) isNodeConnected() bool {
r.Lock()
defer r.Unlock()
return r.nodeConnected
}
func (r *updateReceiver) UpdateNodeStatus() {
r.Lock()
defer r.Unlock()
r.nodeConnected = r.nodeStatus.GetNodeStatus(r.spaceId) == nodestatus.Online
}
func (r *updateReceiver) notify(
objId string,
objStatus pb.EventStatusThreadSyncStatus,
) {
r.sendEvent(objId, &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
Summary: &pb.EventStatusThreadSummary{Status: objStatus},
Cafe: &pb.EventStatusThreadCafe{
Status: objStatus,
Files: &pb.EventStatusThreadCafePinStatus{},
},
}})
}
func (r *updateReceiver) sendEvent(ctx string, event pb.IsEventMessageValue) {
if r.eventSender == nil {
return
}
r.eventSender.Broadcast(&pb.Event{
Messages: []*pb.EventMessage{{Value: event}},
ContextId: ctx,
})
}

View file

@ -0,0 +1,270 @@
package syncstatus
import (
"testing"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"github.com/anyproto/anytype-heart/core/anytype/config"
"github.com/anyproto/anytype-heart/core/event/mock_event"
"github.com/anyproto/anytype-heart/core/syncstatus/filesyncstatus"
"github.com/anyproto/anytype-heart/core/syncstatus/nodestatus"
"github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
func TestUpdateReceiver_UpdateTree(t *testing.T) {
t.Run("update to sync status", func(t *testing.T) {
// given
receiver := newFixture(t)
receiver.nodeConnected = true
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
receiver.sender.EXPECT().Broadcast(&pb.Event{
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Synced},
Cafe: &pb.EventStatusThreadCafe{
Status: pb.EventStatusThread_Synced,
Files: &pb.EventStatusThreadCafePinStatus{},
},
}}}},
ContextId: "id",
}).Return()
// when
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced)
// then
assert.Nil(t, err)
})
t.Run("network incompatible", func(t *testing.T) {
// given
receiver := newFixture(t)
receiver.nodeConnected = true
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusIncompatible)
receiver.sender.EXPECT().Broadcast(&pb.Event{
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_IncompatibleVersion},
Cafe: &pb.EventStatusThreadCafe{
Status: pb.EventStatusThread_IncompatibleVersion,
Files: &pb.EventStatusThreadCafePinStatus{},
},
}}}},
ContextId: "id",
}).Return()
// when
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusNotSynced)
// then
assert.Nil(t, err)
})
t.Run("file storage limited", func(t *testing.T) {
// given
receiver := newFixture(t)
receiver.nodeConnected = true
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
receiver.sender.EXPECT().Broadcast(&pb.Event{
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Syncing},
Cafe: &pb.EventStatusThreadCafe{
Status: pb.EventStatusThread_Syncing,
Files: &pb.EventStatusThreadCafePinStatus{},
},
}}}},
ContextId: "id",
}).Return()
receiver.store.AddObjects(t, []objectstore.TestObject{
{
bundle.RelationKeyId: pbtypes.String("id"),
bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(filesyncstatus.Limited)),
},
})
// when
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusNotSynced)
// then
assert.Nil(t, err)
})
t.Run("object sync status - syncing", func(t *testing.T) {
// given
receiver := newFixture(t)
receiver.nodeConnected = true
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
receiver.sender.EXPECT().Broadcast(&pb.Event{
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Syncing},
Cafe: &pb.EventStatusThreadCafe{
Status: pb.EventStatusThread_Syncing,
Files: &pb.EventStatusThreadCafePinStatus{},
},
}}}},
ContextId: "id",
}).Return()
// when
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusNotSynced)
// then
assert.Nil(t, err)
})
t.Run("object sync status - unknown", func(t *testing.T) {
// given
receiver := newFixture(t)
receiver.nodeConnected = true
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
receiver.sender.EXPECT().Broadcast(&pb.Event{
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Unknown},
Cafe: &pb.EventStatusThreadCafe{
Status: pb.EventStatusThread_Unknown,
Files: &pb.EventStatusThreadCafePinStatus{},
},
}}}},
ContextId: "id",
}).Return()
// when
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusUnknown)
// then
assert.Nil(t, err)
})
t.Run("object sync status - connection error", func(t *testing.T) {
// given
receiver := newFixture(t)
receiver.nodeConnected = false
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
receiver.sender.EXPECT().Broadcast(&pb.Event{
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Offline},
Cafe: &pb.EventStatusThreadCafe{
Status: pb.EventStatusThread_Offline,
Files: &pb.EventStatusThreadCafePinStatus{},
},
}}}},
ContextId: "id",
}).Return()
// when
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced)
// then
assert.Nil(t, err)
})
t.Run("file sync status - synced", func(t *testing.T) {
// given
receiver := newFixture(t)
receiver.nodeConnected = true
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
receiver.sender.EXPECT().Broadcast(&pb.Event{
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Synced},
Cafe: &pb.EventStatusThreadCafe{
Status: pb.EventStatusThread_Synced,
Files: &pb.EventStatusThreadCafePinStatus{},
},
}}}},
ContextId: "id",
}).Return()
receiver.store.AddObjects(t, []objectstore.TestObject{
{
bundle.RelationKeyId: pbtypes.String("id"),
bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(filesyncstatus.Synced)),
},
})
// when
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced)
// then
assert.Nil(t, err)
})
t.Run("file sync status - syncing", func(t *testing.T) {
// given
receiver := newFixture(t)
receiver.nodeConnected = true
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
receiver.sender.EXPECT().Broadcast(&pb.Event{
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Syncing},
Cafe: &pb.EventStatusThreadCafe{
Status: pb.EventStatusThread_Syncing,
Files: &pb.EventStatusThreadCafePinStatus{},
},
}}}},
ContextId: "id",
}).Return()
receiver.store.AddObjects(t, []objectstore.TestObject{
{
bundle.RelationKeyId: pbtypes.String("id"),
bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(filesyncstatus.Syncing)),
},
})
// when
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusUnknown)
// then
assert.Nil(t, err)
})
t.Run("sync status not changed", func(t *testing.T) {
// given
receiver := newFixture(t)
receiver.nodeConnected = true
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk).Times(2)
receiver.sender.EXPECT().Broadcast(&pb.Event{
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Synced},
Cafe: &pb.EventStatusThreadCafe{
Status: pb.EventStatusThread_Synced,
Files: &pb.EventStatusThreadCafePinStatus{},
},
}}}},
ContextId: "id",
}).Return().Times(1)
// when
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced)
assert.Nil(t, err)
err = receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced)
assert.Nil(t, err)
// then
assert.Nil(t, err)
})
}
func newFixture(t *testing.T) *fixture {
ctrl := gomock.NewController(t)
nodeConf := mock_nodeconf.NewMockService(ctrl)
conf := &config.Config{}
sender := mock_event.NewMockSender(t)
storeFixture := objectstore.NewStoreFixture(t)
status := nodestatus.NewNodeStatus()
receiver := newUpdateReceiver(nodeConf, conf, sender, storeFixture, status)
return &fixture{
updateReceiver: receiver,
sender: sender,
nodeConf: nodeConf,
store: storeFixture,
}
}
type fixture struct {
*updateReceiver
sender *mock_event.MockSender
nodeConf *mock_nodeconf.MockService
store *objectstore.StoreFixture
}