mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-07 21:37:04 +09:00
Merge pull request #2468 from anyproto/go-5764-chats-fix-opening-chats-after-cold-recovery
GO-5764: Chats: fix resolving space id for subscription manager
This commit is contained in:
commit
af23bd6d2f
5 changed files with 50 additions and 32 deletions
|
@ -70,9 +70,9 @@ func (_c *MockService_Close_Call) RunAndReturn(run func(context.Context) error)
|
|||
return _c
|
||||
}
|
||||
|
||||
// GetManager provides a mock function with given fields: chatObjectId
|
||||
func (_m *MockService) GetManager(chatObjectId string) (chatsubscription.Manager, error) {
|
||||
ret := _m.Called(chatObjectId)
|
||||
// GetManager provides a mock function with given fields: spaceId, chatObjectId
|
||||
func (_m *MockService) GetManager(spaceId string, chatObjectId string) (chatsubscription.Manager, error) {
|
||||
ret := _m.Called(spaceId, chatObjectId)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for GetManager")
|
||||
|
@ -80,19 +80,19 @@ func (_m *MockService) GetManager(chatObjectId string) (chatsubscription.Manager
|
|||
|
||||
var r0 chatsubscription.Manager
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(string) (chatsubscription.Manager, error)); ok {
|
||||
return rf(chatObjectId)
|
||||
if rf, ok := ret.Get(0).(func(string, string) (chatsubscription.Manager, error)); ok {
|
||||
return rf(spaceId, chatObjectId)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(string) chatsubscription.Manager); ok {
|
||||
r0 = rf(chatObjectId)
|
||||
if rf, ok := ret.Get(0).(func(string, string) chatsubscription.Manager); ok {
|
||||
r0 = rf(spaceId, chatObjectId)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(chatsubscription.Manager)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(string) error); ok {
|
||||
r1 = rf(chatObjectId)
|
||||
if rf, ok := ret.Get(1).(func(string, string) error); ok {
|
||||
r1 = rf(spaceId, chatObjectId)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
@ -106,14 +106,15 @@ type MockService_GetManager_Call struct {
|
|||
}
|
||||
|
||||
// GetManager is a helper method to define mock.On call
|
||||
// - spaceId string
|
||||
// - chatObjectId string
|
||||
func (_e *MockService_Expecter) GetManager(chatObjectId interface{}) *MockService_GetManager_Call {
|
||||
return &MockService_GetManager_Call{Call: _e.mock.On("GetManager", chatObjectId)}
|
||||
func (_e *MockService_Expecter) GetManager(spaceId interface{}, chatObjectId interface{}) *MockService_GetManager_Call {
|
||||
return &MockService_GetManager_Call{Call: _e.mock.On("GetManager", spaceId, chatObjectId)}
|
||||
}
|
||||
|
||||
func (_c *MockService_GetManager_Call) Run(run func(chatObjectId string)) *MockService_GetManager_Call {
|
||||
func (_c *MockService_GetManager_Call) Run(run func(spaceId string, chatObjectId string)) *MockService_GetManager_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(string))
|
||||
run(args[0].(string), args[1].(string))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
@ -123,7 +124,7 @@ func (_c *MockService_GetManager_Call) Return(_a0 chatsubscription.Manager, _a1
|
|||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockService_GetManager_Call) RunAndReturn(run func(string) (chatsubscription.Manager, error)) *MockService_GetManager_Call {
|
||||
func (_c *MockService_GetManager_Call) RunAndReturn(run func(string, string) (chatsubscription.Manager, error)) *MockService_GetManager_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ type Manager interface {
|
|||
|
||||
type Service interface {
|
||||
app.ComponentRunnable
|
||||
GetManager(chatObjectId string) (Manager, error)
|
||||
GetManager(spaceId string, chatObjectId string) (Manager, error)
|
||||
|
||||
SubscribeLastMessages(ctx context.Context, req SubscribeLastMessagesRequest) (*SubscribeLastMessagesResponse, error)
|
||||
Unsubscribe(chatObjectId string, subId string) error
|
||||
|
@ -102,13 +102,13 @@ func (s *service) Close(ctx context.Context) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *service) GetManager(chatObjectId string) (Manager, error) {
|
||||
return s.getManager(chatObjectId)
|
||||
func (s *service) GetManager(spaceId string, chatObjectId string) (Manager, error) {
|
||||
return s.getManager(spaceId, chatObjectId)
|
||||
}
|
||||
|
||||
// getManagerFuture returns a future that should be resolved by the first who called this method.
|
||||
// The idea behind using futures here is to initialize a manager once without blocking the whole service.
|
||||
func (s *service) getManagerFuture(chatObjectId string) *futures.Future[*subscriptionManager] {
|
||||
func (s *service) getManagerFuture(spaceId string, chatObjectId string) *futures.Future[*subscriptionManager] {
|
||||
s.lock.Lock()
|
||||
mngr, ok := s.managers[chatObjectId]
|
||||
if ok {
|
||||
|
@ -120,21 +120,16 @@ func (s *service) getManagerFuture(chatObjectId string) *futures.Future[*subscri
|
|||
s.managers[chatObjectId] = mngr
|
||||
s.lock.Unlock()
|
||||
|
||||
mngr.Resolve(s.initManager(chatObjectId))
|
||||
mngr.Resolve(s.initManager(spaceId, chatObjectId))
|
||||
|
||||
return mngr
|
||||
}
|
||||
|
||||
func (s *service) getManager(chatObjectId string) (*subscriptionManager, error) {
|
||||
return s.getManagerFuture(chatObjectId).Wait()
|
||||
func (s *service) getManager(spaceId string, chatObjectId string) (*subscriptionManager, error) {
|
||||
return s.getManagerFuture(spaceId, chatObjectId).Wait()
|
||||
}
|
||||
|
||||
func (s *service) initManager(chatObjectId string) (*subscriptionManager, error) {
|
||||
spaceId, err := s.spaceIdResolver.ResolveSpaceID(chatObjectId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("resolve space id: %w", err)
|
||||
}
|
||||
|
||||
func (s *service) initManager(spaceId string, chatObjectId string) (*subscriptionManager, error) {
|
||||
currentIdentity := s.accountService.AccountID()
|
||||
currentParticipantId := domain.NewParticipantId(spaceId, currentIdentity)
|
||||
|
||||
|
@ -184,7 +179,12 @@ func (s *service) SubscribeLastMessages(ctx context.Context, req SubscribeLastMe
|
|||
return nil, fmt.Errorf("empty chat object id")
|
||||
}
|
||||
|
||||
mngr, err := s.getManager(req.ChatObjectId)
|
||||
spaceId, err := s.spaceIdResolver.ResolveSpaceID(req.ChatObjectId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("resolve space id: %w", err)
|
||||
}
|
||||
|
||||
mngr, err := s.getManager(spaceId, req.ChatObjectId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get manager: %w", err)
|
||||
}
|
||||
|
@ -229,7 +229,12 @@ func (s *service) SubscribeLastMessages(ctx context.Context, req SubscribeLastMe
|
|||
}
|
||||
|
||||
func (s *service) Unsubscribe(chatObjectId string, subId string) error {
|
||||
mngr, err := s.getManager(chatObjectId)
|
||||
spaceId, err := s.spaceIdResolver.ResolveSpaceID(chatObjectId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("resolve space id: %w", err)
|
||||
}
|
||||
|
||||
mngr, err := s.getManager(spaceId, chatObjectId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get manager: %w", err)
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/anyproto/anytype-heart/core/block/chats/chatrepository"
|
||||
"github.com/anyproto/anytype-heart/core/block/chats/chatsubscription"
|
||||
"github.com/anyproto/anytype-heart/core/block/editor/chatobject"
|
||||
"github.com/anyproto/anytype-heart/core/block/object/idresolver"
|
||||
"github.com/anyproto/anytype-heart/core/domain"
|
||||
"github.com/anyproto/anytype-heart/core/session"
|
||||
subscriptionservice "github.com/anyproto/anytype-heart/core/subscription"
|
||||
|
@ -66,6 +67,7 @@ type accountService interface {
|
|||
}
|
||||
|
||||
type service struct {
|
||||
spaceIdResolver idresolver.Resolver
|
||||
objectGetter cache.ObjectWaitGetter
|
||||
crossSpaceSubService crossspacesub.Service
|
||||
pushService pushService
|
||||
|
@ -106,6 +108,7 @@ func (s *service) Init(a *app.App) error {
|
|||
s.objectStore = app.MustComponent[objectstore.ObjectStore](a)
|
||||
s.objectGetter = app.MustComponent[cache.ObjectWaitGetter](a)
|
||||
s.chatSubscriptionService = app.MustComponent[chatsubscription.Service](a)
|
||||
s.spaceIdResolver = app.MustComponent[idresolver.Resolver](a)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -300,7 +303,12 @@ func (s *service) onChatAddedAsync(chatObjectId string, subId string) error {
|
|||
return fmt.Errorf("subscribe: %w", err)
|
||||
}
|
||||
|
||||
mngr, err := s.chatSubscriptionService.GetManager(chatObjectId)
|
||||
spaceId, err := s.spaceIdResolver.ResolveSpaceID(chatObjectId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("resolve space id: %w", err)
|
||||
}
|
||||
|
||||
mngr, err := s.chatSubscriptionService.GetManager(spaceId, chatObjectId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get manager: %w", err)
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/anyproto/anytype-heart/core/block/chats/chatmodel"
|
||||
"github.com/anyproto/anytype-heart/core/block/chats/chatsubscription"
|
||||
"github.com/anyproto/anytype-heart/core/block/chats/chatsubscription/mock_chatsubscription"
|
||||
"github.com/anyproto/anytype-heart/core/block/object/idresolver/mock_idresolver"
|
||||
"github.com/anyproto/anytype-heart/core/domain"
|
||||
"github.com/anyproto/anytype-heart/core/subscription"
|
||||
"github.com/anyproto/anytype-heart/core/subscription/crossspacesub/mock_crossspacesub"
|
||||
|
@ -111,6 +112,8 @@ func newFixture(t *testing.T) *fixture {
|
|||
objectGetter := mock_cache.NewMockObjectWaitGetterComponent(t)
|
||||
crossSpaceSubService := mock_crossspacesub.NewMockService(t)
|
||||
subscriptionService := mock_chatsubscription.NewMockService(t)
|
||||
idResolver := mock_idresolver.NewMockResolver(t)
|
||||
idResolver.EXPECT().ResolveSpaceID(mock.Anything).Return("", nil).Maybe()
|
||||
|
||||
fx := &fixture{
|
||||
service: New().(*service),
|
||||
|
@ -126,6 +129,7 @@ func newFixture(t *testing.T) *fixture {
|
|||
a.Register(testutil.PrepareMock(ctx, a, objectGetter))
|
||||
a.Register(testutil.PrepareMock(ctx, a, crossSpaceSubService))
|
||||
a.Register(testutil.PrepareMock(ctx, a, subscriptionService))
|
||||
a.Register(testutil.PrepareMock(ctx, a, idResolver))
|
||||
a.Register(&pushServiceDummy{})
|
||||
a.Register(&accountServiceDummy{})
|
||||
a.Register(fx)
|
||||
|
@ -203,7 +207,7 @@ func (fx *fixture) assertSendEvents(t *testing.T, chatIds []string) {
|
|||
manager.EXPECT().Unlock().Return()
|
||||
|
||||
for _, chatId := range chatIds {
|
||||
fx.subscriptionService.EXPECT().GetManager(chatId).Return(manager, nil)
|
||||
fx.subscriptionService.EXPECT().GetManager(mock.Anything, chatId).Return(manager, nil)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -132,7 +132,7 @@ func (s *storeObject) Init(ctx *smartblock.InitContext) error {
|
|||
}
|
||||
s.storeSource = storeSource
|
||||
|
||||
s.subscription, err = s.chatSubscriptionService.GetManager(storeSource.Id())
|
||||
s.subscription, err = s.chatSubscriptionService.GetManager(storeSource.SpaceID(), storeSource.Id())
|
||||
if err != nil {
|
||||
return fmt.Errorf("get subscription manager: %w", err)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue