diff --git a/core/acl/aclservice_test.go b/core/acl/aclservice_test.go index 627997f98..9ebdb5e41 100644 --- a/core/acl/aclservice_test.go +++ b/core/acl/aclservice_test.go @@ -63,6 +63,24 @@ func (c *mockConfig) GetNodeConf() (conf nodeconf.Configuration) { return c.Config } +type pushServiceDummy struct { +} + +func (p *pushServiceDummy) Init(a *app.App) (err error) { + return nil +} + +func (p *pushServiceDummy) Name() (name string) { + return "pushServiceDummy" +} + +func (p *pushServiceDummy) CreateSpace(ctx context.Context, spaceId string) (err error) { + return nil +} + +func (p *pushServiceDummy) SubscribeToTopics(ctx context.Context, spaceId string, topic []string) { +} + type fixture struct { *aclService a *app.App @@ -106,7 +124,8 @@ func newFixture(t *testing.T) *fixture { Register(testutil.PrepareMock(ctx, fx.a, fx.mockInviteService)). Register(testutil.PrepareMock(ctx, fx.a, fx.mockCoordinatorClient)). Register(fx.mockConfig). - Register(fx.aclService) + Register(fx.aclService). + Register(&pushServiceDummy{}) require.NoError(t, fx.a.Start(ctx)) return fx } diff --git a/core/block/object/treesyncer/treesyncer_test.go b/core/block/object/treesyncer/treesyncer_test.go index 9ae3d31b9..553b70632 100644 --- a/core/block/object/treesyncer/treesyncer_test.go +++ b/core/block/object/treesyncer/treesyncer_test.go @@ -50,6 +50,11 @@ func newFixture(t *testing.T, spaceId string) *fixture { spaceStorage.EXPECT().StateStorage().AnyTimes().Return(stateStorage) stateStorage.EXPECT().SettingsId().AnyTimes().Return("settingsId") + missingMock.EXPECT().Lock().AnyTimes() + missingMock.EXPECT().Unlock().AnyTimes() + existingMock.EXPECT().Lock().AnyTimes() + existingMock.EXPECT().Unlock().AnyTimes() + a := new(app.App) a.Register(testutil.PrepareMock(context.Background(), a, treeManager)). Register(testutil.PrepareMock(context.Background(), a, spaceStorage)). diff --git a/space/spacecore/spacekey/spacekey.go b/core/pushnotification/keys.go similarity index 82% rename from space/spacecore/spacekey/spacekey.go rename to core/pushnotification/keys.go index 49ecd25d5..308f3b5f9 100644 --- a/space/spacecore/spacekey/spacekey.go +++ b/core/pushnotification/keys.go @@ -1,4 +1,4 @@ -package spacekey +package pushnotification import ( "github.com/anyproto/any-sync/util/crypto" @@ -13,7 +13,7 @@ const ( spacePath = "m/SLIP-0021/anytype/space/key" ) -func DeriveSpaceKey(firstMetadataKey crypto.PrivKey) (string, crypto.PrivKey, error) { +func deriveSpaceKey(firstMetadataKey crypto.PrivKey) (string, crypto.PrivKey, error) { key, err := privkey.DeriveFromPrivKey(spaceKeyPath, firstMetadataKey) if err != nil { return "", nil, err @@ -29,7 +29,7 @@ func DeriveSpaceKey(firstMetadataKey crypto.PrivKey) (string, crypto.PrivKey, er return encodedKey, key, nil } -func DeriveSymmetricKey(readKey crypto.SymKey) (crypto.SymKey, error) { +func deriveSymmetricKey(readKey crypto.SymKey) (crypto.SymKey, error) { raw, err := readKey.Raw() if err != nil { return nil, err diff --git a/core/pushnotification/service.go b/core/pushnotification/service.go index 41241e3fa..623950b96 100644 --- a/core/pushnotification/service.go +++ b/core/pushnotification/service.go @@ -2,6 +2,7 @@ package pushnotification import ( "context" + "encoding/base64" "fmt" "sync" @@ -12,12 +13,12 @@ import ( "github.com/cheggaaa/mb/v3" "github.com/anyproto/anytype-heart/core/anytype/config" + "github.com/anyproto/anytype-heart/core/event" "github.com/anyproto/anytype-heart/core/pushnotification/pushclient" "github.com/anyproto/anytype-heart/core/wallet" "github.com/anyproto/anytype-heart/pb" "github.com/anyproto/anytype-heart/pkg/lib/logging" "github.com/anyproto/anytype-heart/space" - "github.com/anyproto/anytype-heart/space/spacecore/spacekey" ) const CName = "core.pushnotification.service" @@ -42,16 +43,18 @@ func New() Service { } type service struct { - pushClient pushclient.Client - wallet wallet.Wallet - cfg *config.Config + pushClient pushclient.Client + wallet wallet.Wallet + cfg *config.Config + spaceService space.Service + eventSender event.Sender + started bool - activeSubscriptions map[string]TopicSet activeSubscriptionsLock sync.Mutex + activeSubscriptions map[string]TopicSet ctx context.Context cancel context.CancelFunc batcher *mb.MB[newSubscription] - spaceService space.Service } func (s *service) SubscribeToTopics(ctx context.Context, spaceId string, topics []string) { @@ -88,6 +91,7 @@ func (s *service) Init(a *app.App) (err error) { s.wallet = app.MustComponent[wallet.Wallet](a) s.batcher = mb.New[newSubscription](0) s.spaceService = app.MustComponent[space.Service](a) + s.eventSender = app.MustComponent[event.Sender](a) return } @@ -145,7 +149,7 @@ func (s *service) createTopicsForSpace(spaceId string, topicNames []string) ([]* if err != nil { return nil, err } - _, signKey, err := spacekey.DeriveSpaceKey(firstMetadataKey) + _, signKey, err := deriveSpaceKey(firstMetadataKey) if err != nil { return nil, fmt.Errorf("failed to get key for space %s: %w", spaceId, err) } @@ -169,7 +173,7 @@ func (s *service) CreateSpace(ctx context.Context, spaceId string) (err error) { if err != nil { return err } - _, signKey, err := spacekey.DeriveSpaceKey(firstMetadataKey) + _, signKey, err := deriveSpaceKey(firstMetadataKey) if err != nil { return err } @@ -202,7 +206,7 @@ func (s *service) Notify(ctx context.Context, spaceId string, topic []string, pa if err != nil { return err } - spaceKeyId, signKey, err := spacekey.DeriveSpaceKey(firstMetadataKey) + spaceKeyId, signKey, err := deriveSpaceKey(firstMetadataKey) if err != nil { return err } @@ -235,7 +239,7 @@ func (s *service) prepareEncryptedPayload(state *list.AclState, payload []byte) if err != nil { return nil, err } - encryptionKey, err := spacekey.DeriveSymmetricKey(symKey) + encryptionKey, err := deriveSymmetricKey(symKey) if err != nil { return nil, err } @@ -349,3 +353,40 @@ func (s *service) addNewSubscription(sub newSubscription) (shouldUpdate bool, er s.activeSubscriptions[sub.SpaceId] = activeTopics return shouldUpdate, nil } + +func (a *service) BroadcastKeyUpdate(spaceId string, aclState *list.AclState) error { + firstMetadataKey, err := aclState.FirstMetadataKey() + if err != nil { + return err + } + readKey, err := aclState.CurrentReadKey() + if err != nil { + return err + } + spaceKeyId, _, err := deriveSpaceKey(firstMetadataKey) + if err != nil { + return err + } + encryptionKey, err := deriveSymmetricKey(readKey) + if err != nil { + return err + } + raw, err := encryptionKey.Raw() + if err != nil { + return err + } + encodedKey := base64.StdEncoding.EncodeToString(raw) + a.eventSender.Broadcast(&pb.Event{ + Messages: []*pb.EventMessage{ + { + SpaceId: spaceId, + Value: &pb.EventMessageValueOfKeyUpdate{KeyUpdate: &pb.EventKeyUpdate{ + SpaceKeyId: spaceKeyId, + EncryptionKeyId: aclState.CurrentReadKeyId(), + EncryptionKey: encodedKey, + }}, + }, + }, + }) + return nil +} diff --git a/space/internal/components/aclobjectmanager/aclobjectmanager.go b/space/internal/components/aclobjectmanager/aclobjectmanager.go index 67afaa09d..8e6ff9f26 100644 --- a/space/internal/components/aclobjectmanager/aclobjectmanager.go +++ b/space/internal/components/aclobjectmanager/aclobjectmanager.go @@ -2,7 +2,6 @@ package aclobjectmanager import ( "context" - "encoding/base64" "fmt" "slices" "sync" @@ -11,7 +10,6 @@ import ( "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/debugstat" "github.com/anyproto/any-sync/app/logger" - "github.com/anyproto/any-sync/commonspace" "github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/object/acl/syncacl" "github.com/anyproto/any-sync/util/crypto" @@ -19,15 +17,12 @@ import ( "go.uber.org/zap" "github.com/anyproto/anytype-heart/core/block/chats/chatpush" - "github.com/anyproto/anytype-heart/core/event" - "github.com/anyproto/anytype-heart/pb" "github.com/anyproto/anytype-heart/space/clientspace" "github.com/anyproto/anytype-heart/space/internal/components/aclnotifications" "github.com/anyproto/anytype-heart/space/internal/components/invitemigrator" "github.com/anyproto/anytype-heart/space/internal/components/participantwatcher" "github.com/anyproto/anytype-heart/space/internal/components/spaceloader" "github.com/anyproto/anytype-heart/space/internal/components/spacestatus" - "github.com/anyproto/anytype-heart/space/spacecore/spacekey" "github.com/anyproto/anytype-heart/space/spaceinfo" ) @@ -48,6 +43,7 @@ func New(ownerMetadata []byte, guestKey crypto.PrivKey) AclObjectManager { type pushNotificationService interface { SubscribeToTopics(ctx context.Context, spaceId string, topics []string) + BroadcastKeyUpdate(spaceId string, aclState *list.AclState) error } type aclObjectManager struct { @@ -72,7 +68,6 @@ type aclObjectManager struct { lastIndexed string guestKey crypto.PrivKey mx sync.Mutex - eventSender event.Sender } type SpaceLoaderListener interface { @@ -123,7 +118,6 @@ func (a *aclObjectManager) Init(ap *app.App) (err error) { a.waitLoad = make(chan struct{}) a.wait = make(chan struct{}) a.pushNotificationService = app.MustComponent[pushNotificationService](ap) - a.eventSender = app.MustComponent[event.Sender](ap) return nil } @@ -271,43 +265,11 @@ func (a *aclObjectManager) processAcl() (err error) { a.mx.Lock() defer a.mx.Unlock() a.lastIndexed = acl.Head().Id - return a.broadcastKeyUpdate(aclState, common) -} -func (a *aclObjectManager) broadcastKeyUpdate(aclState *list.AclState, common commonspace.Space) error { - firstMetadataKey, err := aclState.FirstMetadataKey() + err = a.pushNotificationService.BroadcastKeyUpdate(common.Id(), aclState) if err != nil { - return err + return fmt.Errorf("broadcast key update: %w", err) } - readKey, err := aclState.CurrentReadKey() - if err != nil { - return err - } - spaceKeyId, _, err := spacekey.DeriveSpaceKey(firstMetadataKey) - if err != nil { - return err - } - encryptionKey, err := spacekey.DeriveSymmetricKey(readKey) - if err != nil { - return err - } - raw, err := encryptionKey.Raw() - if err != nil { - return err - } - encodedKey := base64.StdEncoding.EncodeToString(raw) - a.eventSender.Broadcast(&pb.Event{ - Messages: []*pb.EventMessage{ - { - SpaceId: common.Id(), - Value: &pb.EventMessageValueOfKeyUpdate{KeyUpdate: &pb.EventKeyUpdate{ - SpaceKeyId: spaceKeyId, - EncryptionKeyId: aclState.CurrentReadKeyId(), - EncryptionKey: encodedKey, - }}, - }, - }, - }) return nil } diff --git a/space/internal/components/aclobjectmanager/aclobjectmananger_test.go b/space/internal/components/aclobjectmanager/aclobjectmananger_test.go index 99182843f..b3815a158 100644 --- a/space/internal/components/aclobjectmanager/aclobjectmananger_test.go +++ b/space/internal/components/aclobjectmanager/aclobjectmananger_test.go @@ -33,6 +33,25 @@ import ( "github.com/anyproto/anytype-heart/tests/testutil" ) +type pushNotificationServiceDummy struct { +} + +func (s *pushNotificationServiceDummy) Init(a *app.App) (err error) { + return nil +} + +func (s *pushNotificationServiceDummy) Name() (name string) { + return "pushNotificationServiceDummy" +} + +func (s *pushNotificationServiceDummy) SubscribeToTopics(ctx context.Context, spaceId string, topics []string) { + +} + +func (s pushNotificationServiceDummy) BroadcastKeyUpdate(spaceId string, aclState *list.AclState) error { + return nil +} + func TestAclObjectManager(t *testing.T) { t.Run("owner", func(t *testing.T) { a := list.NewAclExecutor("spaceId") @@ -205,7 +224,8 @@ func newFixture(t *testing.T) *fixture { Register(testutil.PrepareMock(ctx, fx.a, fx.mockAclNotification)). Register(testutil.PrepareMock(ctx, fx.a, fx.mockAccountService)). Register(fx.spaceLoaderListener). - Register(fx) + Register(fx). + Register(&pushNotificationServiceDummy{}) return fx } diff --git a/space/techspace/mock_techspace/mock_TechSpace.go b/space/techspace/mock_techspace/mock_TechSpace.go index 796783df5..65d0cd163 100644 --- a/space/techspace/mock_techspace/mock_TechSpace.go +++ b/space/techspace/mock_techspace/mock_TechSpace.go @@ -10,8 +10,6 @@ import ( domain "github.com/anyproto/anytype-heart/core/domain" - keyvalueobserver "github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver" - mock "github.com/stretchr/testify/mock" objectcache "github.com/anyproto/anytype-heart/core/block/object/objectcache" @@ -380,17 +378,17 @@ func (_c *MockTechSpace_Name_Call) RunAndReturn(run func() string) *MockTechSpac return _c } -// Run provides a mock function with given fields: techCoreSpace, objectCache, kvObserver, create -func (_m *MockTechSpace) Run(techCoreSpace commonspace.Space, objectCache objectcache.Cache, kvObserver keyvalueobserver.Observer, create bool) error { - ret := _m.Called(techCoreSpace, objectCache, kvObserver, create) +// Run provides a mock function with given fields: techCoreSpace, objectCache, create +func (_m *MockTechSpace) Run(techCoreSpace commonspace.Space, objectCache objectcache.Cache, create bool) error { + ret := _m.Called(techCoreSpace, objectCache, create) if len(ret) == 0 { panic("no return value specified for Run") } var r0 error - if rf, ok := ret.Get(0).(func(commonspace.Space, objectcache.Cache, keyvalueobserver.Observer, bool) error); ok { - r0 = rf(techCoreSpace, objectCache, kvObserver, create) + if rf, ok := ret.Get(0).(func(commonspace.Space, objectcache.Cache, bool) error); ok { + r0 = rf(techCoreSpace, objectCache, create) } else { r0 = ret.Error(0) } @@ -406,15 +404,14 @@ type MockTechSpace_Run_Call struct { // Run is a helper method to define mock.On call // - techCoreSpace commonspace.Space // - objectCache objectcache.Cache -// - kvObserver keyvalueobserver.Observer // - create bool -func (_e *MockTechSpace_Expecter) Run(techCoreSpace interface{}, objectCache interface{}, kvObserver interface{}, create interface{}) *MockTechSpace_Run_Call { - return &MockTechSpace_Run_Call{Call: _e.mock.On("Run", techCoreSpace, objectCache, kvObserver, create)} +func (_e *MockTechSpace_Expecter) Run(techCoreSpace interface{}, objectCache interface{}, create interface{}) *MockTechSpace_Run_Call { + return &MockTechSpace_Run_Call{Call: _e.mock.On("Run", techCoreSpace, objectCache, create)} } -func (_c *MockTechSpace_Run_Call) Run(run func(techCoreSpace commonspace.Space, objectCache objectcache.Cache, kvObserver keyvalueobserver.Observer, create bool)) *MockTechSpace_Run_Call { +func (_c *MockTechSpace_Run_Call) Run(run func(techCoreSpace commonspace.Space, objectCache objectcache.Cache, create bool)) *MockTechSpace_Run_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(commonspace.Space), args[1].(objectcache.Cache), args[2].(keyvalueobserver.Observer), args[3].(bool)) + run(args[0].(commonspace.Space), args[1].(objectcache.Cache), args[2].(bool)) }) return _c } @@ -424,7 +421,7 @@ func (_c *MockTechSpace_Run_Call) Return(err error) *MockTechSpace_Run_Call { return _c } -func (_c *MockTechSpace_Run_Call) RunAndReturn(run func(commonspace.Space, objectcache.Cache, keyvalueobserver.Observer, bool) error) *MockTechSpace_Run_Call { +func (_c *MockTechSpace_Run_Call) RunAndReturn(run func(commonspace.Space, objectcache.Cache, bool) error) *MockTechSpace_Run_Call { _c.Call.Return(run) return _c } diff --git a/space/techspace/techspace_test.go b/space/techspace/techspace_test.go index d012df896..f5ca4e865 100644 --- a/space/techspace/techspace_test.go +++ b/space/techspace/techspace_test.go @@ -358,7 +358,7 @@ func newFixture(t *testing.T, storeIDs []string) *fixture { return nil, nil }).Times(1) require.NoError(t, fx.a.Start(ctx)) - err := fx.TechSpace.Run(fx.techCore, fx.objectCache, nil, false) + err := fx.TechSpace.Run(fx.techCore, fx.objectCache, false) require.NoError(t, err) // do not cancel wakeUpIds func