mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-10 18:10:49 +09:00
GO-3171 Merge branch 'main' of github.com:anyproto/anytype-heart into go-3171-check-p2p-status-and-send-event
# Conflicts: # core/syncstatus/service.go
This commit is contained in:
commit
9a2f10c77d
8 changed files with 536 additions and 5 deletions
|
@ -196,6 +196,11 @@ packages:
|
|||
github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus:
|
||||
interfaces:
|
||||
Updater:
|
||||
UpdateReceiver:
|
||||
config:
|
||||
dir: "{{.InterfaceDir}}"
|
||||
outpkg: "{{.PackageName}}"
|
||||
inpackage: true
|
||||
github.com/anyproto/anytype-heart/space/spacecore/peermanager:
|
||||
interfaces:
|
||||
Updater:
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package syncstatus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
|
@ -51,7 +52,10 @@ func (s *service) indexFileSyncStatus(fileObjectId string, status filesyncstatus
|
|||
if err != nil {
|
||||
return fmt.Errorf("get object: %w", err)
|
||||
}
|
||||
|
||||
err = s.updateReceiver.UpdateTree(context.Background(), fileObjectId, status.ToSyncStatus())
|
||||
if err != nil {
|
||||
return fmt.Errorf("update tree: %w", err)
|
||||
}
|
||||
s.sendSpaceStatusUpdate(status, spaceId, bytesLeftPercentage)
|
||||
return nil
|
||||
}
|
||||
|
|
116
core/syncstatus/objectsyncstatus/mock_UpdateReceiver.go
Normal file
116
core/syncstatus/objectsyncstatus/mock_UpdateReceiver.go
Normal file
|
@ -0,0 +1,116 @@
|
|||
// Code generated by mockery. DO NOT EDIT.
|
||||
|
||||
package objectsyncstatus
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// MockUpdateReceiver is an autogenerated mock type for the UpdateReceiver type
|
||||
type MockUpdateReceiver struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
type MockUpdateReceiver_Expecter struct {
|
||||
mock *mock.Mock
|
||||
}
|
||||
|
||||
func (_m *MockUpdateReceiver) EXPECT() *MockUpdateReceiver_Expecter {
|
||||
return &MockUpdateReceiver_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// UpdateNodeStatus provides a mock function with given fields:
|
||||
func (_m *MockUpdateReceiver) UpdateNodeStatus() {
|
||||
_m.Called()
|
||||
}
|
||||
|
||||
// MockUpdateReceiver_UpdateNodeStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateNodeStatus'
|
||||
type MockUpdateReceiver_UpdateNodeStatus_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// UpdateNodeStatus is a helper method to define mock.On call
|
||||
func (_e *MockUpdateReceiver_Expecter) UpdateNodeStatus() *MockUpdateReceiver_UpdateNodeStatus_Call {
|
||||
return &MockUpdateReceiver_UpdateNodeStatus_Call{Call: _e.mock.On("UpdateNodeStatus")}
|
||||
}
|
||||
|
||||
func (_c *MockUpdateReceiver_UpdateNodeStatus_Call) Run(run func()) *MockUpdateReceiver_UpdateNodeStatus_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run()
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockUpdateReceiver_UpdateNodeStatus_Call) Return() *MockUpdateReceiver_UpdateNodeStatus_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockUpdateReceiver_UpdateNodeStatus_Call) RunAndReturn(run func()) *MockUpdateReceiver_UpdateNodeStatus_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// UpdateTree provides a mock function with given fields: ctx, treeId, status
|
||||
func (_m *MockUpdateReceiver) UpdateTree(ctx context.Context, treeId string, status SyncStatus) error {
|
||||
ret := _m.Called(ctx, treeId, status)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for UpdateTree")
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, SyncStatus) error); ok {
|
||||
r0 = rf(ctx, treeId, status)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockUpdateReceiver_UpdateTree_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateTree'
|
||||
type MockUpdateReceiver_UpdateTree_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// UpdateTree is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - treeId string
|
||||
// - status SyncStatus
|
||||
func (_e *MockUpdateReceiver_Expecter) UpdateTree(ctx interface{}, treeId interface{}, status interface{}) *MockUpdateReceiver_UpdateTree_Call {
|
||||
return &MockUpdateReceiver_UpdateTree_Call{Call: _e.mock.On("UpdateTree", ctx, treeId, status)}
|
||||
}
|
||||
|
||||
func (_c *MockUpdateReceiver_UpdateTree_Call) Run(run func(ctx context.Context, treeId string, status SyncStatus)) *MockUpdateReceiver_UpdateTree_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(string), args[2].(SyncStatus))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockUpdateReceiver_UpdateTree_Call) Return(err error) *MockUpdateReceiver_UpdateTree_Call {
|
||||
_c.Call.Return(err)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockUpdateReceiver_UpdateTree_Call) RunAndReturn(run func(context.Context, string, SyncStatus) error) *MockUpdateReceiver_UpdateTree_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewMockUpdateReceiver creates a new instance of MockUpdateReceiver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockUpdateReceiver(t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}) *MockUpdateReceiver {
|
||||
mock := &MockUpdateReceiver{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
|
@ -31,7 +31,8 @@ const (
|
|||
var log = logger.NewNamed(syncstatus.CName)
|
||||
|
||||
type UpdateReceiver interface {
|
||||
UpdateTree(ctx context.Context, treeId string, status SyncStatus)
|
||||
UpdateTree(ctx context.Context, treeId string, status SyncStatus) (err error)
|
||||
UpdateNodeStatus()
|
||||
}
|
||||
|
||||
type SyncStatus int
|
||||
|
@ -51,6 +52,7 @@ type StatusUpdater interface {
|
|||
type StatusWatcher interface {
|
||||
Watch(treeId string) (err error)
|
||||
Unwatch(treeId string)
|
||||
SetUpdateReceiver(updater UpdateReceiver)
|
||||
}
|
||||
|
||||
type StatusService interface {
|
||||
|
@ -77,9 +79,10 @@ type Updater interface {
|
|||
|
||||
type syncStatusService struct {
|
||||
sync.Mutex
|
||||
configuration nodeconf.NodeConf
|
||||
periodicSync periodicsync.PeriodicSync
|
||||
storage spacestorage.SpaceStorage
|
||||
configuration nodeconf.NodeConf
|
||||
periodicSync periodicsync.PeriodicSync
|
||||
updateReceiver UpdateReceiver
|
||||
storage spacestorage.SpaceStorage
|
||||
|
||||
spaceId string
|
||||
treeHeads map[string]treeHeadsEntry
|
||||
|
@ -127,6 +130,13 @@ func (s *syncStatusService) Name() (name string) {
|
|||
return syncstatus.CName
|
||||
}
|
||||
|
||||
func (s *syncStatusService) SetUpdateReceiver(updater UpdateReceiver) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
s.updateReceiver = updater
|
||||
}
|
||||
|
||||
func (s *syncStatusService) Run(ctx context.Context) error {
|
||||
s.periodicSync.Run()
|
||||
return nil
|
||||
|
@ -152,6 +162,10 @@ func (s *syncStatusService) update(ctx context.Context) (err error) {
|
|||
s.treeStatusBuf = s.treeStatusBuf[:0]
|
||||
|
||||
s.Lock()
|
||||
if s.updateReceiver == nil {
|
||||
s.Unlock()
|
||||
return
|
||||
}
|
||||
for treeId := range s.watchers {
|
||||
// that means that we haven't yet got the status update
|
||||
treeHeads, exists := s.treeHeads[treeId]
|
||||
|
@ -163,7 +177,12 @@ func (s *syncStatusService) update(ctx context.Context) (err error) {
|
|||
s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, treeHeads.syncStatus})
|
||||
}
|
||||
s.Unlock()
|
||||
s.updateReceiver.UpdateNodeStatus()
|
||||
for _, entry := range s.treeStatusBuf {
|
||||
err = s.updateReceiver.UpdateTree(ctx, entry.treeId, entry.status)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.updateDetails(entry.treeId, mapStatus(entry.status))
|
||||
}
|
||||
return
|
||||
|
|
|
@ -228,6 +228,10 @@ func TestSyncStatusService_update(t *testing.T) {
|
|||
t.Run("update: got updates on objects", func(t *testing.T) {
|
||||
// given
|
||||
s := newFixture(t)
|
||||
updateReceiver := NewMockUpdateReceiver(t)
|
||||
updateReceiver.EXPECT().UpdateNodeStatus().Return()
|
||||
updateReceiver.EXPECT().UpdateTree(context.Background(), "id", StatusNotSynced).Return(nil)
|
||||
s.SetUpdateReceiver(updateReceiver)
|
||||
|
||||
// when
|
||||
s.detailsUpdater.EXPECT().UpdateDetails([]string{"id"}, domain.ObjectSyncing, domain.Null, "spaceId")
|
||||
|
@ -239,6 +243,7 @@ func TestSyncStatusService_update(t *testing.T) {
|
|||
|
||||
// then
|
||||
assert.Nil(t, err)
|
||||
updateReceiver.AssertCalled(t, "UpdateTree", context.Background(), "id", StatusNotSynced)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -6,10 +6,14 @@ import (
|
|||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/commonspace"
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/anytype/config"
|
||||
"github.com/anyproto/anytype-heart/core/block/cache"
|
||||
"github.com/anyproto/anytype-heart/core/domain"
|
||||
"github.com/anyproto/anytype-heart/core/event"
|
||||
"github.com/anyproto/anytype-heart/core/filestorage/filesync"
|
||||
"github.com/anyproto/anytype-heart/core/syncstatus/nodestatus"
|
||||
"github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus"
|
||||
"github.com/anyproto/anytype-heart/core/syncstatus/spacesyncstatus"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
|
||||
|
@ -32,6 +36,7 @@ type Updater interface {
|
|||
var _ Service = (*service)(nil)
|
||||
|
||||
type service struct {
|
||||
updateReceiver *updateReceiver
|
||||
fileSyncService filesync.FileSync
|
||||
|
||||
objectWatchersLock sync.Mutex
|
||||
|
@ -58,6 +63,12 @@ func (s *service) Init(a *app.App) (err error) {
|
|||
s.fileSyncService.OnUploadStarted(s.onFileUploadStarted)
|
||||
s.fileSyncService.OnLimited(s.onFileLimited)
|
||||
s.fileSyncService.OnDelete(s.OnFileDelete)
|
||||
|
||||
nodeConfService := app.MustComponent[nodeconf.Service](a)
|
||||
cfg := app.MustComponent[*config.Config](a)
|
||||
eventSender := app.MustComponent[event.Sender](a)
|
||||
nodeStatus := app.MustComponent[nodestatus.NodeStatus](a)
|
||||
s.updateReceiver = newUpdateReceiver(nodeConfService, cfg, eventSender, s.objectStore, nodeStatus)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -73,7 +84,9 @@ func (s *service) RegisterSpace(space commonspace.Space, sw objectsyncstatus.Sta
|
|||
s.objectWatchersLock.Lock()
|
||||
defer s.objectWatchersLock.Unlock()
|
||||
|
||||
sw.SetUpdateReceiver(s.updateReceiver)
|
||||
s.objectWatchers[space.Id()] = sw
|
||||
s.updateReceiver.spaceId = space.Id()
|
||||
}
|
||||
|
||||
func (s *service) UnregisterSpace(space commonspace.Space) {
|
||||
|
|
125
core/syncstatus/updatereceiver.go
Normal file
125
core/syncstatus/updatereceiver.go
Normal file
|
@ -0,0 +1,125 @@
|
|||
package syncstatus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/anytype/config"
|
||||
"github.com/anyproto/anytype-heart/core/event"
|
||||
"github.com/anyproto/anytype-heart/core/syncstatus/filesyncstatus"
|
||||
"github.com/anyproto/anytype-heart/core/syncstatus/nodestatus"
|
||||
"github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus"
|
||||
"github.com/anyproto/anytype-heart/pb"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
|
||||
)
|
||||
|
||||
type updateReceiver struct {
|
||||
eventSender event.Sender
|
||||
|
||||
nodeConfService nodeconf.Service
|
||||
sync.Mutex
|
||||
nodeConnected bool
|
||||
objectStore objectstore.ObjectStore
|
||||
nodeStatus nodestatus.NodeStatus
|
||||
spaceId string
|
||||
}
|
||||
|
||||
func newUpdateReceiver(
|
||||
nodeConfService nodeconf.Service,
|
||||
cfg *config.Config,
|
||||
eventSender event.Sender,
|
||||
objectStore objectstore.ObjectStore,
|
||||
nodeStatus nodestatus.NodeStatus,
|
||||
) *updateReceiver {
|
||||
if cfg.DisableThreadsSyncEvents {
|
||||
eventSender = nil
|
||||
}
|
||||
return &updateReceiver{
|
||||
nodeConfService: nodeConfService,
|
||||
eventSender: eventSender,
|
||||
objectStore: objectStore,
|
||||
nodeStatus: nodeStatus,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *updateReceiver) UpdateTree(_ context.Context, objId string, status objectsyncstatus.SyncStatus) error {
|
||||
objStatusEvent := r.getObjectSyncStatus(objId, status)
|
||||
r.notify(objId, objStatusEvent)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *updateReceiver) getFileStatus(fileId string) (filesyncstatus.Status, error) {
|
||||
details, err := r.objectStore.GetDetails(fileId)
|
||||
if err != nil {
|
||||
return filesyncstatus.Unknown, fmt.Errorf("get file details: %w", err)
|
||||
}
|
||||
if v, ok := details.GetDetails().GetFields()[bundle.RelationKeyFileBackupStatus.String()]; ok {
|
||||
return filesyncstatus.Status(v.GetNumberValue()), nil
|
||||
}
|
||||
return filesyncstatus.Unknown, fmt.Errorf("no backup status")
|
||||
}
|
||||
|
||||
func (r *updateReceiver) getObjectSyncStatus(objectId string, status objectsyncstatus.SyncStatus) pb.EventStatusThreadSyncStatus {
|
||||
fileStatus, err := r.getFileStatus(objectId)
|
||||
if err == nil {
|
||||
// Prefer file backup status
|
||||
if fileStatus != filesyncstatus.Synced {
|
||||
status = fileStatus.ToSyncStatus()
|
||||
}
|
||||
}
|
||||
|
||||
if r.nodeConfService.NetworkCompatibilityStatus() == nodeconf.NetworkCompatibilityStatusIncompatible {
|
||||
return pb.EventStatusThread_IncompatibleVersion
|
||||
}
|
||||
|
||||
if !r.isNodeConnected() {
|
||||
return pb.EventStatusThread_Offline
|
||||
}
|
||||
|
||||
switch status {
|
||||
case objectsyncstatus.StatusUnknown:
|
||||
return pb.EventStatusThread_Unknown
|
||||
case objectsyncstatus.StatusSynced:
|
||||
return pb.EventStatusThread_Synced
|
||||
}
|
||||
return pb.EventStatusThread_Syncing
|
||||
}
|
||||
|
||||
func (r *updateReceiver) isNodeConnected() bool {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
return r.nodeConnected
|
||||
}
|
||||
|
||||
func (r *updateReceiver) UpdateNodeStatus() {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
r.nodeConnected = r.nodeStatus.GetNodeStatus(r.spaceId) == nodestatus.Online
|
||||
}
|
||||
|
||||
func (r *updateReceiver) notify(
|
||||
objId string,
|
||||
objStatus pb.EventStatusThreadSyncStatus,
|
||||
) {
|
||||
r.sendEvent(objId, &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
|
||||
Summary: &pb.EventStatusThreadSummary{Status: objStatus},
|
||||
Cafe: &pb.EventStatusThreadCafe{
|
||||
Status: objStatus,
|
||||
Files: &pb.EventStatusThreadCafePinStatus{},
|
||||
},
|
||||
}})
|
||||
}
|
||||
|
||||
func (r *updateReceiver) sendEvent(ctx string, event pb.IsEventMessageValue) {
|
||||
if r.eventSender == nil {
|
||||
return
|
||||
}
|
||||
r.eventSender.Broadcast(&pb.Event{
|
||||
Messages: []*pb.EventMessage{{Value: event}},
|
||||
ContextId: ctx,
|
||||
})
|
||||
}
|
244
core/syncstatus/updatereceiver_test.go
Normal file
244
core/syncstatus/updatereceiver_test.go
Normal file
|
@ -0,0 +1,244 @@
|
|||
package syncstatus
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/anytype/config"
|
||||
"github.com/anyproto/anytype-heart/core/event/mock_event"
|
||||
"github.com/anyproto/anytype-heart/core/syncstatus/filesyncstatus"
|
||||
"github.com/anyproto/anytype-heart/core/syncstatus/nodestatus"
|
||||
"github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus"
|
||||
"github.com/anyproto/anytype-heart/pb"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
|
||||
"github.com/anyproto/anytype-heart/util/pbtypes"
|
||||
)
|
||||
|
||||
func TestUpdateReceiver_UpdateTree(t *testing.T) {
|
||||
t.Run("update to sync status", func(t *testing.T) {
|
||||
// given
|
||||
receiver := newFixture(t)
|
||||
receiver.nodeConnected = true
|
||||
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
|
||||
receiver.sender.EXPECT().Broadcast(&pb.Event{
|
||||
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
|
||||
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Synced},
|
||||
Cafe: &pb.EventStatusThreadCafe{
|
||||
Status: pb.EventStatusThread_Synced,
|
||||
Files: &pb.EventStatusThreadCafePinStatus{},
|
||||
},
|
||||
}}}},
|
||||
ContextId: "id",
|
||||
}).Return()
|
||||
|
||||
// when
|
||||
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced)
|
||||
|
||||
// then
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
t.Run("network incompatible", func(t *testing.T) {
|
||||
// given
|
||||
receiver := newFixture(t)
|
||||
receiver.nodeConnected = true
|
||||
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusIncompatible)
|
||||
receiver.sender.EXPECT().Broadcast(&pb.Event{
|
||||
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
|
||||
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_IncompatibleVersion},
|
||||
Cafe: &pb.EventStatusThreadCafe{
|
||||
Status: pb.EventStatusThread_IncompatibleVersion,
|
||||
Files: &pb.EventStatusThreadCafePinStatus{},
|
||||
},
|
||||
}}}},
|
||||
ContextId: "id",
|
||||
}).Return()
|
||||
|
||||
// when
|
||||
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusNotSynced)
|
||||
|
||||
// then
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
t.Run("file storage limited", func(t *testing.T) {
|
||||
// given
|
||||
receiver := newFixture(t)
|
||||
receiver.nodeConnected = true
|
||||
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
|
||||
receiver.sender.EXPECT().Broadcast(&pb.Event{
|
||||
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
|
||||
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Syncing},
|
||||
Cafe: &pb.EventStatusThreadCafe{
|
||||
Status: pb.EventStatusThread_Syncing,
|
||||
Files: &pb.EventStatusThreadCafePinStatus{},
|
||||
},
|
||||
}}}},
|
||||
ContextId: "id",
|
||||
}).Return()
|
||||
|
||||
receiver.store.AddObjects(t, []objectstore.TestObject{
|
||||
{
|
||||
bundle.RelationKeyId: pbtypes.String("id"),
|
||||
bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(filesyncstatus.Limited)),
|
||||
},
|
||||
})
|
||||
|
||||
// when
|
||||
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusNotSynced)
|
||||
|
||||
// then
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
t.Run("object sync status - syncing", func(t *testing.T) {
|
||||
// given
|
||||
receiver := newFixture(t)
|
||||
receiver.nodeConnected = true
|
||||
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
|
||||
receiver.sender.EXPECT().Broadcast(&pb.Event{
|
||||
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
|
||||
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Syncing},
|
||||
Cafe: &pb.EventStatusThreadCafe{
|
||||
Status: pb.EventStatusThread_Syncing,
|
||||
Files: &pb.EventStatusThreadCafePinStatus{},
|
||||
},
|
||||
}}}},
|
||||
ContextId: "id",
|
||||
}).Return()
|
||||
|
||||
// when
|
||||
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusNotSynced)
|
||||
|
||||
// then
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
t.Run("object sync status - unknown", func(t *testing.T) {
|
||||
// given
|
||||
receiver := newFixture(t)
|
||||
receiver.nodeConnected = true
|
||||
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
|
||||
receiver.sender.EXPECT().Broadcast(&pb.Event{
|
||||
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
|
||||
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Unknown},
|
||||
Cafe: &pb.EventStatusThreadCafe{
|
||||
Status: pb.EventStatusThread_Unknown,
|
||||
Files: &pb.EventStatusThreadCafePinStatus{},
|
||||
},
|
||||
}}}},
|
||||
ContextId: "id",
|
||||
}).Return()
|
||||
|
||||
// when
|
||||
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusUnknown)
|
||||
|
||||
// then
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
t.Run("object sync status - connection error", func(t *testing.T) {
|
||||
// given
|
||||
receiver := newFixture(t)
|
||||
receiver.nodeConnected = false
|
||||
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
|
||||
receiver.sender.EXPECT().Broadcast(&pb.Event{
|
||||
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
|
||||
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Offline},
|
||||
Cafe: &pb.EventStatusThreadCafe{
|
||||
Status: pb.EventStatusThread_Offline,
|
||||
Files: &pb.EventStatusThreadCafePinStatus{},
|
||||
},
|
||||
}}}},
|
||||
ContextId: "id",
|
||||
}).Return()
|
||||
|
||||
// when
|
||||
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced)
|
||||
|
||||
// then
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
t.Run("file sync status - synced", func(t *testing.T) {
|
||||
// given
|
||||
receiver := newFixture(t)
|
||||
receiver.nodeConnected = true
|
||||
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
|
||||
receiver.sender.EXPECT().Broadcast(&pb.Event{
|
||||
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
|
||||
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Synced},
|
||||
Cafe: &pb.EventStatusThreadCafe{
|
||||
Status: pb.EventStatusThread_Synced,
|
||||
Files: &pb.EventStatusThreadCafePinStatus{},
|
||||
},
|
||||
}}}},
|
||||
ContextId: "id",
|
||||
}).Return()
|
||||
|
||||
receiver.store.AddObjects(t, []objectstore.TestObject{
|
||||
{
|
||||
bundle.RelationKeyId: pbtypes.String("id"),
|
||||
bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(filesyncstatus.Synced)),
|
||||
},
|
||||
})
|
||||
|
||||
// when
|
||||
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusSynced)
|
||||
|
||||
// then
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
t.Run("file sync status - syncing", func(t *testing.T) {
|
||||
// given
|
||||
receiver := newFixture(t)
|
||||
receiver.nodeConnected = true
|
||||
receiver.nodeConf.EXPECT().NetworkCompatibilityStatus().Return(nodeconf.NetworkCompatibilityStatusOk)
|
||||
receiver.sender.EXPECT().Broadcast(&pb.Event{
|
||||
Messages: []*pb.EventMessage{{Value: &pb.EventMessageValueOfThreadStatus{ThreadStatus: &pb.EventStatusThread{
|
||||
Summary: &pb.EventStatusThreadSummary{Status: pb.EventStatusThread_Syncing},
|
||||
Cafe: &pb.EventStatusThreadCafe{
|
||||
Status: pb.EventStatusThread_Syncing,
|
||||
Files: &pb.EventStatusThreadCafePinStatus{},
|
||||
},
|
||||
}}}},
|
||||
ContextId: "id",
|
||||
}).Return()
|
||||
|
||||
receiver.store.AddObjects(t, []objectstore.TestObject{
|
||||
{
|
||||
bundle.RelationKeyId: pbtypes.String("id"),
|
||||
bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(filesyncstatus.Syncing)),
|
||||
},
|
||||
})
|
||||
|
||||
// when
|
||||
err := receiver.UpdateTree(nil, "id", objectsyncstatus.StatusUnknown)
|
||||
|
||||
// then
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T) *fixture {
|
||||
ctrl := gomock.NewController(t)
|
||||
nodeConf := mock_nodeconf.NewMockService(ctrl)
|
||||
conf := &config.Config{}
|
||||
sender := mock_event.NewMockSender(t)
|
||||
storeFixture := objectstore.NewStoreFixture(t)
|
||||
status := nodestatus.NewNodeStatus()
|
||||
|
||||
receiver := newUpdateReceiver(nodeConf, conf, sender, storeFixture, status)
|
||||
return &fixture{
|
||||
updateReceiver: receiver,
|
||||
sender: sender,
|
||||
nodeConf: nodeConf,
|
||||
store: storeFixture,
|
||||
}
|
||||
}
|
||||
|
||||
type fixture struct {
|
||||
*updateReceiver
|
||||
sender *mock_event.MockSender
|
||||
nodeConf *mock_nodeconf.MockService
|
||||
store *objectstore.StoreFixture
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue