diff --git a/.mockery.yaml b/.mockery.yaml index 601de9ed2..17091d998 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -253,3 +253,4 @@ packages: github.com/anyproto/anytype-heart/core/block/chats/chatsubscription: interfaces: Service: + Manager: diff --git a/core/block/chats/chatsubscription/manager.go b/core/block/chats/chatsubscription/manager.go index 326f1f729..f3216c8fe 100644 --- a/core/block/chats/chatsubscription/manager.go +++ b/core/block/chats/chatsubscription/manager.go @@ -123,6 +123,10 @@ func (s *subscriptionManager) UpdateChatState(updater func(*model.ChatState) *mo s.chatStateUpdated = true } +func (s *subscriptionManager) ForceSendingChatState() { + s.chatStateUpdated = true +} + // Flush is called after committing changes func (s *subscriptionManager) Flush() { if !s.canSend() { diff --git a/core/block/chats/chatsubscription/mock_chatsubscription/mock_Manager.go b/core/block/chats/chatsubscription/mock_chatsubscription/mock_Manager.go new file mode 100644 index 000000000..08e7a6d19 --- /dev/null +++ b/core/block/chats/chatsubscription/mock_chatsubscription/mock_Manager.go @@ -0,0 +1,530 @@ +// Code generated by mockery. DO NOT EDIT. + +package mock_chatsubscription + +import ( + chatmodel "github.com/anyproto/anytype-heart/core/block/chats/chatmodel" + + mock "github.com/stretchr/testify/mock" + + model "github.com/anyproto/anytype-heart/pkg/lib/pb/model" + + session "github.com/anyproto/anytype-heart/core/session" +) + +// MockManager is an autogenerated mock type for the Manager type +type MockManager struct { + mock.Mock +} + +type MockManager_Expecter struct { + mock *mock.Mock +} + +func (_m *MockManager) EXPECT() *MockManager_Expecter { + return &MockManager_Expecter{mock: &_m.Mock} +} + +// Add provides a mock function with given fields: prevOrderId, message +func (_m *MockManager) Add(prevOrderId string, message *chatmodel.Message) { + _m.Called(prevOrderId, message) +} + +// MockManager_Add_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Add' +type MockManager_Add_Call struct { + *mock.Call +} + +// Add is a helper method to define mock.On call +// - prevOrderId string +// - message *chatmodel.Message +func (_e *MockManager_Expecter) Add(prevOrderId interface{}, message interface{}) *MockManager_Add_Call { + return &MockManager_Add_Call{Call: _e.mock.On("Add", prevOrderId, message)} +} + +func (_c *MockManager_Add_Call) Run(run func(prevOrderId string, message *chatmodel.Message)) *MockManager_Add_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(*chatmodel.Message)) + }) + return _c +} + +func (_c *MockManager_Add_Call) Return() *MockManager_Add_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_Add_Call) RunAndReturn(run func(string, *chatmodel.Message)) *MockManager_Add_Call { + _c.Call.Return(run) + return _c +} + +// Delete provides a mock function with given fields: messageId +func (_m *MockManager) Delete(messageId string) { + _m.Called(messageId) +} + +// MockManager_Delete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delete' +type MockManager_Delete_Call struct { + *mock.Call +} + +// Delete is a helper method to define mock.On call +// - messageId string +func (_e *MockManager_Expecter) Delete(messageId interface{}) *MockManager_Delete_Call { + return &MockManager_Delete_Call{Call: _e.mock.On("Delete", messageId)} +} + +func (_c *MockManager_Delete_Call) Run(run func(messageId string)) *MockManager_Delete_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockManager_Delete_Call) Return() *MockManager_Delete_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_Delete_Call) RunAndReturn(run func(string)) *MockManager_Delete_Call { + _c.Call.Return(run) + return _c +} + +// Flush provides a mock function with given fields: +func (_m *MockManager) Flush() { + _m.Called() +} + +// MockManager_Flush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Flush' +type MockManager_Flush_Call struct { + *mock.Call +} + +// Flush is a helper method to define mock.On call +func (_e *MockManager_Expecter) Flush() *MockManager_Flush_Call { + return &MockManager_Flush_Call{Call: _e.mock.On("Flush")} +} + +func (_c *MockManager_Flush_Call) Run(run func()) *MockManager_Flush_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_Flush_Call) Return() *MockManager_Flush_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_Flush_Call) RunAndReturn(run func()) *MockManager_Flush_Call { + _c.Call.Return(run) + return _c +} + +// ForceSendingChatState provides a mock function with given fields: +func (_m *MockManager) ForceSendingChatState() { + _m.Called() +} + +// MockManager_ForceSendingChatState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceSendingChatState' +type MockManager_ForceSendingChatState_Call struct { + *mock.Call +} + +// ForceSendingChatState is a helper method to define mock.On call +func (_e *MockManager_Expecter) ForceSendingChatState() *MockManager_ForceSendingChatState_Call { + return &MockManager_ForceSendingChatState_Call{Call: _e.mock.On("ForceSendingChatState")} +} + +func (_c *MockManager_ForceSendingChatState_Call) Run(run func()) *MockManager_ForceSendingChatState_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_ForceSendingChatState_Call) Return() *MockManager_ForceSendingChatState_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_ForceSendingChatState_Call) RunAndReturn(run func()) *MockManager_ForceSendingChatState_Call { + _c.Call.Return(run) + return _c +} + +// GetChatState provides a mock function with given fields: +func (_m *MockManager) GetChatState() *model.ChatState { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetChatState") + } + + var r0 *model.ChatState + if rf, ok := ret.Get(0).(func() *model.ChatState); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.ChatState) + } + } + + return r0 +} + +// MockManager_GetChatState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChatState' +type MockManager_GetChatState_Call struct { + *mock.Call +} + +// GetChatState is a helper method to define mock.On call +func (_e *MockManager_Expecter) GetChatState() *MockManager_GetChatState_Call { + return &MockManager_GetChatState_Call{Call: _e.mock.On("GetChatState")} +} + +func (_c *MockManager_GetChatState_Call) Run(run func()) *MockManager_GetChatState_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_GetChatState_Call) Return(_a0 *model.ChatState) *MockManager_GetChatState_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockManager_GetChatState_Call) RunAndReturn(run func() *model.ChatState) *MockManager_GetChatState_Call { + _c.Call.Return(run) + return _c +} + +// IsActive provides a mock function with given fields: +func (_m *MockManager) IsActive() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for IsActive") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockManager_IsActive_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsActive' +type MockManager_IsActive_Call struct { + *mock.Call +} + +// IsActive is a helper method to define mock.On call +func (_e *MockManager_Expecter) IsActive() *MockManager_IsActive_Call { + return &MockManager_IsActive_Call{Call: _e.mock.On("IsActive")} +} + +func (_c *MockManager_IsActive_Call) Run(run func()) *MockManager_IsActive_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_IsActive_Call) Return(_a0 bool) *MockManager_IsActive_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockManager_IsActive_Call) RunAndReturn(run func() bool) *MockManager_IsActive_Call { + _c.Call.Return(run) + return _c +} + +// Lock provides a mock function with given fields: +func (_m *MockManager) Lock() { + _m.Called() +} + +// MockManager_Lock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Lock' +type MockManager_Lock_Call struct { + *mock.Call +} + +// Lock is a helper method to define mock.On call +func (_e *MockManager_Expecter) Lock() *MockManager_Lock_Call { + return &MockManager_Lock_Call{Call: _e.mock.On("Lock")} +} + +func (_c *MockManager_Lock_Call) Run(run func()) *MockManager_Lock_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_Lock_Call) Return() *MockManager_Lock_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_Lock_Call) RunAndReturn(run func()) *MockManager_Lock_Call { + _c.Call.Return(run) + return _c +} + +// ReadMessages provides a mock function with given fields: newOldestOrderId, idsModified, counterType +func (_m *MockManager) ReadMessages(newOldestOrderId string, idsModified []string, counterType chatmodel.CounterType) { + _m.Called(newOldestOrderId, idsModified, counterType) +} + +// MockManager_ReadMessages_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadMessages' +type MockManager_ReadMessages_Call struct { + *mock.Call +} + +// ReadMessages is a helper method to define mock.On call +// - newOldestOrderId string +// - idsModified []string +// - counterType chatmodel.CounterType +func (_e *MockManager_Expecter) ReadMessages(newOldestOrderId interface{}, idsModified interface{}, counterType interface{}) *MockManager_ReadMessages_Call { + return &MockManager_ReadMessages_Call{Call: _e.mock.On("ReadMessages", newOldestOrderId, idsModified, counterType)} +} + +func (_c *MockManager_ReadMessages_Call) Run(run func(newOldestOrderId string, idsModified []string, counterType chatmodel.CounterType)) *MockManager_ReadMessages_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].([]string), args[2].(chatmodel.CounterType)) + }) + return _c +} + +func (_c *MockManager_ReadMessages_Call) Return() *MockManager_ReadMessages_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_ReadMessages_Call) RunAndReturn(run func(string, []string, chatmodel.CounterType)) *MockManager_ReadMessages_Call { + _c.Call.Return(run) + return _c +} + +// SetSessionContext provides a mock function with given fields: ctx +func (_m *MockManager) SetSessionContext(ctx session.Context) { + _m.Called(ctx) +} + +// MockManager_SetSessionContext_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSessionContext' +type MockManager_SetSessionContext_Call struct { + *mock.Call +} + +// SetSessionContext is a helper method to define mock.On call +// - ctx session.Context +func (_e *MockManager_Expecter) SetSessionContext(ctx interface{}) *MockManager_SetSessionContext_Call { + return &MockManager_SetSessionContext_Call{Call: _e.mock.On("SetSessionContext", ctx)} +} + +func (_c *MockManager_SetSessionContext_Call) Run(run func(ctx session.Context)) *MockManager_SetSessionContext_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(session.Context)) + }) + return _c +} + +func (_c *MockManager_SetSessionContext_Call) Return() *MockManager_SetSessionContext_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_SetSessionContext_Call) RunAndReturn(run func(session.Context)) *MockManager_SetSessionContext_Call { + _c.Call.Return(run) + return _c +} + +// Unlock provides a mock function with given fields: +func (_m *MockManager) Unlock() { + _m.Called() +} + +// MockManager_Unlock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Unlock' +type MockManager_Unlock_Call struct { + *mock.Call +} + +// Unlock is a helper method to define mock.On call +func (_e *MockManager_Expecter) Unlock() *MockManager_Unlock_Call { + return &MockManager_Unlock_Call{Call: _e.mock.On("Unlock")} +} + +func (_c *MockManager_Unlock_Call) Run(run func()) *MockManager_Unlock_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_Unlock_Call) Return() *MockManager_Unlock_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_Unlock_Call) RunAndReturn(run func()) *MockManager_Unlock_Call { + _c.Call.Return(run) + return _c +} + +// UnreadMessages provides a mock function with given fields: newOldestOrderId, lastStateId, msgIds, counterType +func (_m *MockManager) UnreadMessages(newOldestOrderId string, lastStateId string, msgIds []string, counterType chatmodel.CounterType) { + _m.Called(newOldestOrderId, lastStateId, msgIds, counterType) +} + +// MockManager_UnreadMessages_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UnreadMessages' +type MockManager_UnreadMessages_Call struct { + *mock.Call +} + +// UnreadMessages is a helper method to define mock.On call +// - newOldestOrderId string +// - lastStateId string +// - msgIds []string +// - counterType chatmodel.CounterType +func (_e *MockManager_Expecter) UnreadMessages(newOldestOrderId interface{}, lastStateId interface{}, msgIds interface{}, counterType interface{}) *MockManager_UnreadMessages_Call { + return &MockManager_UnreadMessages_Call{Call: _e.mock.On("UnreadMessages", newOldestOrderId, lastStateId, msgIds, counterType)} +} + +func (_c *MockManager_UnreadMessages_Call) Run(run func(newOldestOrderId string, lastStateId string, msgIds []string, counterType chatmodel.CounterType)) *MockManager_UnreadMessages_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string), args[2].([]string), args[3].(chatmodel.CounterType)) + }) + return _c +} + +func (_c *MockManager_UnreadMessages_Call) Return() *MockManager_UnreadMessages_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_UnreadMessages_Call) RunAndReturn(run func(string, string, []string, chatmodel.CounterType)) *MockManager_UnreadMessages_Call { + _c.Call.Return(run) + return _c +} + +// UpdateChatState provides a mock function with given fields: updater +func (_m *MockManager) UpdateChatState(updater func(*model.ChatState) *model.ChatState) { + _m.Called(updater) +} + +// MockManager_UpdateChatState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateChatState' +type MockManager_UpdateChatState_Call struct { + *mock.Call +} + +// UpdateChatState is a helper method to define mock.On call +// - updater func(*model.ChatState) *model.ChatState +func (_e *MockManager_Expecter) UpdateChatState(updater interface{}) *MockManager_UpdateChatState_Call { + return &MockManager_UpdateChatState_Call{Call: _e.mock.On("UpdateChatState", updater)} +} + +func (_c *MockManager_UpdateChatState_Call) Run(run func(updater func(*model.ChatState) *model.ChatState)) *MockManager_UpdateChatState_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(func(*model.ChatState) *model.ChatState)) + }) + return _c +} + +func (_c *MockManager_UpdateChatState_Call) Return() *MockManager_UpdateChatState_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_UpdateChatState_Call) RunAndReturn(run func(func(*model.ChatState) *model.ChatState)) *MockManager_UpdateChatState_Call { + _c.Call.Return(run) + return _c +} + +// UpdateFull provides a mock function with given fields: message +func (_m *MockManager) UpdateFull(message *chatmodel.Message) { + _m.Called(message) +} + +// MockManager_UpdateFull_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateFull' +type MockManager_UpdateFull_Call struct { + *mock.Call +} + +// UpdateFull is a helper method to define mock.On call +// - message *chatmodel.Message +func (_e *MockManager_Expecter) UpdateFull(message interface{}) *MockManager_UpdateFull_Call { + return &MockManager_UpdateFull_Call{Call: _e.mock.On("UpdateFull", message)} +} + +func (_c *MockManager_UpdateFull_Call) Run(run func(message *chatmodel.Message)) *MockManager_UpdateFull_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*chatmodel.Message)) + }) + return _c +} + +func (_c *MockManager_UpdateFull_Call) Return() *MockManager_UpdateFull_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_UpdateFull_Call) RunAndReturn(run func(*chatmodel.Message)) *MockManager_UpdateFull_Call { + _c.Call.Return(run) + return _c +} + +// UpdateReactions provides a mock function with given fields: message +func (_m *MockManager) UpdateReactions(message *chatmodel.Message) { + _m.Called(message) +} + +// MockManager_UpdateReactions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateReactions' +type MockManager_UpdateReactions_Call struct { + *mock.Call +} + +// UpdateReactions is a helper method to define mock.On call +// - message *chatmodel.Message +func (_e *MockManager_Expecter) UpdateReactions(message interface{}) *MockManager_UpdateReactions_Call { + return &MockManager_UpdateReactions_Call{Call: _e.mock.On("UpdateReactions", message)} +} + +func (_c *MockManager_UpdateReactions_Call) Run(run func(message *chatmodel.Message)) *MockManager_UpdateReactions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*chatmodel.Message)) + }) + return _c +} + +func (_c *MockManager_UpdateReactions_Call) Return() *MockManager_UpdateReactions_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_UpdateReactions_Call) RunAndReturn(run func(*chatmodel.Message)) *MockManager_UpdateReactions_Call { + _c.Call.Return(run) + return _c +} + +// NewMockManager creates a new instance of MockManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockManager(t interface { + mock.TestingT + Cleanup(func()) +}) *MockManager { + mock := &MockManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/block/chats/chatsubscription/service.go b/core/block/chats/chatsubscription/service.go index bbf753b1a..fa41152c1 100644 --- a/core/block/chats/chatsubscription/service.go +++ b/core/block/chats/chatsubscription/service.go @@ -36,6 +36,7 @@ type Manager interface { UpdateChatState(updater func(*model.ChatState) *model.ChatState) Add(prevOrderId string, message *chatmodel.Message) Delete(messageId string) + ForceSendingChatState() Flush() ReadMessages(newOldestOrderId string, idsModified []string, counterType chatmodel.CounterType) UnreadMessages(newOldestOrderId string, lastStateId string, msgIds []string, counterType chatmodel.CounterType) @@ -163,18 +164,17 @@ func (s *service) initManager(chatObjectId string) (*subscriptionManager, error) } type SubscribeLastMessagesRequest struct { - ChatObjectId string - SubId string - Limit int - // If AsyncInit is true, initial messages will be broadcast via events - AsyncInit bool + ChatObjectId string + SubId string + Limit int WithDependencies bool OnlyLastMessage bool } type SubscribeLastMessagesResponse struct { - Messages []*chatmodel.Message - ChatState *model.ChatState + PreviousOrderId string + Messages []*chatmodel.Message + ChatState *model.ChatState // Dependencies per message id Dependencies map[string][]*domain.Details } @@ -205,37 +205,27 @@ func (s *service) SubscribeLastMessages(ctx context.Context, req SubscribeLastMe mngr.subscribe(req.SubId, req.WithDependencies, req.OnlyLastMessage) - if req.AsyncInit { - var previousOrderId string - if len(messages) > 0 { - previousOrderId, err = mngr.repository.GetPrevOrderId(txn.Context(), messages[0].OrderId) - if err != nil { - return nil, fmt.Errorf("get previous order id: %w", err) - } - } + depsPerMessage := map[string][]*domain.Details{} + if req.WithDependencies { for _, message := range messages { - mngr.Add(previousOrderId, message) - previousOrderId = message.OrderId + deps := mngr.collectMessageDependencies(message.ChatMessage) + depsPerMessage[message.Id] = deps } - - // Force chatState to be sent - mngr.chatStateUpdated = true - mngr.Flush() - return nil, nil - } else { - depsPerMessage := map[string][]*domain.Details{} - if req.WithDependencies { - for _, message := range messages { - deps := mngr.collectMessageDependencies(message.ChatMessage) - depsPerMessage[message.Id] = deps - } - } - return &SubscribeLastMessagesResponse{ - Messages: messages, - ChatState: mngr.GetChatState(), - Dependencies: depsPerMessage, - }, nil } + + var previousOrderId string + if len(messages) > 0 { + previousOrderId, err = mngr.repository.GetPrevOrderId(txn.Context(), messages[0].OrderId) + if err != nil { + return nil, fmt.Errorf("get previous order id: %w", err) + } + } + return &SubscribeLastMessagesResponse{ + Messages: messages, + ChatState: mngr.GetChatState(), + Dependencies: depsPerMessage, + PreviousOrderId: previousOrderId, + }, nil } func (s *service) Unsubscribe(chatObjectId string, subId string) error { diff --git a/core/block/chats/service.go b/core/block/chats/service.go index 7cba4a227..ef5197fb6 100644 --- a/core/block/chats/service.go +++ b/core/block/chats/service.go @@ -149,7 +149,7 @@ func (s *service) SubscribeToMessagePreviews(ctx context.Context, subId string) go func() { defer wg.Done() - chatAddResp, err := s.onChatAdded(chatObjectId, subId, false) + chatAddResp, err := s.onChatAdded(chatObjectId, subId) if err != nil { log.Error("init lastMessage subscription", zap.Error(err)) return @@ -242,7 +242,7 @@ func (s *service) monitorMessagePreviews() { } for subId := range s.subscriptionIds { - _, err := s.onChatAdded(add.Id, subId, true) + err := s.onChatAddedAsync(add.Id, subId) if err != nil { log.Error("init last message subscription", zap.Error(err)) } @@ -278,17 +278,44 @@ func (s *service) monitorMessagePreviews() { } } -func (s *service) onChatAdded(chatObjectId string, subId string, asyncInit bool) (*chatsubscription.SubscribeLastMessagesResponse, error) { +func (s *service) onChatAdded(chatObjectId string, subId string) (*chatsubscription.SubscribeLastMessagesResponse, error) { return s.chatSubscriptionService.SubscribeLastMessages(s.componentCtx, chatsubscription.SubscribeLastMessagesRequest{ ChatObjectId: chatObjectId, SubId: subId, Limit: 1, - AsyncInit: asyncInit, WithDependencies: true, OnlyLastMessage: true, }) } +func (s *service) onChatAddedAsync(chatObjectId string, subId string) error { + resp, err := s.chatSubscriptionService.SubscribeLastMessages(s.componentCtx, chatsubscription.SubscribeLastMessagesRequest{ + ChatObjectId: chatObjectId, + SubId: subId, + Limit: 1, + WithDependencies: true, + OnlyLastMessage: true, + }) + if err != nil { + return fmt.Errorf("subscribe: %w", err) + } + + mngr, err := s.chatSubscriptionService.GetManager(chatObjectId) + if err != nil { + return fmt.Errorf("get manager: %w", err) + } + mngr.Lock() + defer mngr.Unlock() + + if len(resp.Messages) > 0 { + mngr.Add(resp.PreviousOrderId, resp.Messages[0]) + } + mngr.ForceSendingChatState() + mngr.Flush() + + return nil +} + func (s *service) onChatRemoved(chatObjectId string, subId string) error { err := s.Unsubscribe(chatObjectId, subId) if err != nil && !errors.Is(err, domain.ErrObjectNotFound) { @@ -421,7 +448,6 @@ func (s *service) SubscribeLastMessages(ctx context.Context, chatObjectId string ChatObjectId: chatObjectId, SubId: subId, Limit: limit, - AsyncInit: false, WithDependencies: false, }) } diff --git a/core/block/chats/service_test.go b/core/block/chats/service_test.go index 6cb558401..a6cfe6f12 100644 --- a/core/block/chats/service_test.go +++ b/core/block/chats/service_test.go @@ -194,6 +194,19 @@ func (fx *fixture) expectSubscribe(t *testing.T) { }).Maybe() } +func (fx *fixture) assertSendEvents(t *testing.T, chatIds []string) { + manager := mock_chatsubscription.NewMockManager(t) + manager.EXPECT().Lock().Return() + manager.EXPECT().Add(mock.Anything, mock.Anything).Return().Maybe() + manager.EXPECT().ForceSendingChatState().Return() + manager.EXPECT().Flush().Return() + manager.EXPECT().Unlock().Return() + + for _, chatId := range chatIds { + fx.subscriptionService.EXPECT().GetManager(chatId).Return(manager, nil) + } +} + func TestSubscribeToMessagePreviews(t *testing.T) { t.Run("subscribe to all existing chats", func(t *testing.T) { fx := newFixture(t) @@ -259,6 +272,8 @@ func TestSubscribeToMessagePreviews(t *testing.T) { Records: []*domain.Details{}, }, nil).Maybe() + fx.assertSendEvents(t, []string{"chat1", "chat2"}) + fx.start(t) fx.chatObjectsSubQueue.Add(ctx, &pb.EventMessage{ diff --git a/core/block/editor/chatobject/mock_chatobject/mock_StoreObject.go b/core/block/editor/chatobject/mock_chatobject/mock_StoreObject.go index 933c1bbb8..d7e87f50e 100644 --- a/core/block/editor/chatobject/mock_chatobject/mock_StoreObject.go +++ b/core/block/editor/chatobject/mock_chatobject/mock_StoreObject.go @@ -1619,17 +1619,17 @@ func (_c *MockStoreObject_MarkMessagesAsUnread_Call) RunAndReturn(run func(conte return _c } -// MarkReadMessages provides a mock function with given fields: ctx, afterOrderId, beforeOrderId, lastStateId, counterType -func (_m *MockStoreObject) MarkReadMessages(ctx context.Context, afterOrderId string, beforeOrderId string, lastStateId string, counterType chatmodel.CounterType) error { - ret := _m.Called(ctx, afterOrderId, beforeOrderId, lastStateId, counterType) +// MarkReadMessages provides a mock function with given fields: ctx, req +func (_m *MockStoreObject) MarkReadMessages(ctx context.Context, req chatobject.ReadMessagesRequest) error { + ret := _m.Called(ctx, req) if len(ret) == 0 { panic("no return value specified for MarkReadMessages") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, string, chatmodel.CounterType) error); ok { - r0 = rf(ctx, afterOrderId, beforeOrderId, lastStateId, counterType) + if rf, ok := ret.Get(0).(func(context.Context, chatobject.ReadMessagesRequest) error); ok { + r0 = rf(ctx, req) } else { r0 = ret.Error(0) } @@ -1644,17 +1644,14 @@ type MockStoreObject_MarkReadMessages_Call struct { // MarkReadMessages is a helper method to define mock.On call // - ctx context.Context -// - afterOrderId string -// - beforeOrderId string -// - lastStateId string -// - counterType chatmodel.CounterType -func (_e *MockStoreObject_Expecter) MarkReadMessages(ctx interface{}, afterOrderId interface{}, beforeOrderId interface{}, lastStateId interface{}, counterType interface{}) *MockStoreObject_MarkReadMessages_Call { - return &MockStoreObject_MarkReadMessages_Call{Call: _e.mock.On("MarkReadMessages", ctx, afterOrderId, beforeOrderId, lastStateId, counterType)} +// - req chatobject.ReadMessagesRequest +func (_e *MockStoreObject_Expecter) MarkReadMessages(ctx interface{}, req interface{}) *MockStoreObject_MarkReadMessages_Call { + return &MockStoreObject_MarkReadMessages_Call{Call: _e.mock.On("MarkReadMessages", ctx, req)} } -func (_c *MockStoreObject_MarkReadMessages_Call) Run(run func(ctx context.Context, afterOrderId string, beforeOrderId string, lastStateId string, counterType chatmodel.CounterType)) *MockStoreObject_MarkReadMessages_Call { +func (_c *MockStoreObject_MarkReadMessages_Call) Run(run func(ctx context.Context, req chatobject.ReadMessagesRequest)) *MockStoreObject_MarkReadMessages_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string), args[4].(chatmodel.CounterType)) + run(args[0].(context.Context), args[1].(chatobject.ReadMessagesRequest)) }) return _c } @@ -1664,7 +1661,7 @@ func (_c *MockStoreObject_MarkReadMessages_Call) Return(_a0 error) *MockStoreObj return _c } -func (_c *MockStoreObject_MarkReadMessages_Call) RunAndReturn(run func(context.Context, string, string, string, chatmodel.CounterType) error) *MockStoreObject_MarkReadMessages_Call { +func (_c *MockStoreObject_MarkReadMessages_Call) RunAndReturn(run func(context.Context, chatobject.ReadMessagesRequest) error) *MockStoreObject_MarkReadMessages_Call { _c.Call.Return(run) return _c } diff --git a/core/block/editor/chatobject/subscription_test.go b/core/block/editor/chatobject/subscription_test.go index 4f688cc72..f12afa62a 100644 --- a/core/block/editor/chatobject/subscription_test.go +++ b/core/block/editor/chatobject/subscription_test.go @@ -30,7 +30,7 @@ func TestSubscription(t *testing.T) { } resp, err := fx.chatSubscriptionService.SubscribeLastMessages(ctx, chatsubscription.SubscribeLastMessagesRequest{ - ChatObjectId: fx.Id(), SubId: "subId", Limit: 5, AsyncInit: false, + ChatObjectId: fx.Id(), SubId: "subId", Limit: 5, }) require.NoError(t, err) wantTexts := []string{"text 6", "text 7", "text 8", "text 9", "text 10"} @@ -186,7 +186,7 @@ func TestSubscriptionMessageCounters(t *testing.T) { fx.chatHandler.forceNotRead = true subscribeResp, err := fx.chatSubscriptionService.SubscribeLastMessages(ctx, chatsubscription.SubscribeLastMessagesRequest{ - ChatObjectId: fx.Id(), SubId: "subId", Limit: 10, AsyncInit: false, + ChatObjectId: fx.Id(), SubId: "subId", Limit: 10, }) require.NoError(t, err) @@ -330,7 +330,6 @@ func TestSubscriptionMentionCounters(t *testing.T) { ChatObjectId: fx.Id(), SubId: "subId", Limit: 10, - AsyncInit: false, }) require.NoError(t, err) @@ -482,7 +481,6 @@ func TestSubscriptionWithDeps(t *testing.T) { ChatObjectId: fx.Id(), SubId: "subId", Limit: 10, - AsyncInit: false, WithDependencies: true, }) require.NoError(t, err)