1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-10 18:10:49 +09:00

GO-4779: Chats: Add subscription id to events

This commit is contained in:
Sergey 2025-01-22 13:39:54 +01:00
parent 6fce57d615
commit 4ea6a6bb0e
No known key found for this signature in database
GPG key ID: 3B6BEF79160221C6
10 changed files with 1984 additions and 1637 deletions

View file

@ -27,8 +27,8 @@ type Service interface {
DeleteMessage(ctx context.Context, chatObjectId string, messageId string) error
GetMessages(ctx context.Context, chatObjectId string, req chatobject.GetMessagesRequest) ([]*model.ChatMessage, error)
GetMessagesByIds(ctx context.Context, chatObjectId string, messageIds []string) ([]*model.ChatMessage, error)
SubscribeLastMessages(ctx context.Context, chatObjectId string, limit int) ([]*model.ChatMessage, int, error)
Unsubscribe(chatObjectId string) error
SubscribeLastMessages(ctx context.Context, chatObjectId string, limit int, subId string) ([]*model.ChatMessage, int, error)
Unsubscribe(chatObjectId string, subId string) error
app.ComponentRunnable
}
@ -138,14 +138,14 @@ func (s *service) GetMessagesByIds(ctx context.Context, chatObjectId string, mes
return res, err
}
func (s *service) SubscribeLastMessages(ctx context.Context, chatObjectId string, limit int) ([]*model.ChatMessage, int, error) {
func (s *service) SubscribeLastMessages(ctx context.Context, chatObjectId string, limit int, subId string) ([]*model.ChatMessage, int, error) {
var (
msgs []*model.ChatMessage
numBefore int
)
err := cache.Do(s.objectGetter, chatObjectId, func(sb chatobject.StoreObject) error {
var err error
msgs, numBefore, err = sb.SubscribeLastMessages(ctx, limit)
msgs, numBefore, err = sb.SubscribeLastMessages(ctx, subId, limit)
if err != nil {
return err
}
@ -154,8 +154,8 @@ func (s *service) SubscribeLastMessages(ctx context.Context, chatObjectId string
return msgs, numBefore, err
}
func (s *service) Unsubscribe(chatObjectId string) error {
func (s *service) Unsubscribe(chatObjectId string, subId string) error {
return cache.Do(s.objectGetter, chatObjectId, func(sb chatobject.StoreObject) error {
return sb.Unsubscribe()
return sb.Unsubscribe(subId)
})
}

View file

@ -38,8 +38,8 @@ type StoreObject interface {
EditMessage(ctx context.Context, messageId string, newMessage *model.ChatMessage) error
ToggleMessageReaction(ctx context.Context, messageId string, emoji string) error
DeleteMessage(ctx context.Context, messageId string) error
SubscribeLastMessages(ctx context.Context, limit int) ([]*model.ChatMessage, int, error)
Unsubscribe() error
SubscribeLastMessages(ctx context.Context, subId string, limit int) ([]*model.ChatMessage, int, error)
Unsubscribe(subId string) error
}
type GetMessagesRequest struct {
@ -307,7 +307,7 @@ func (s *storeObject) hasMyReaction(ctx context.Context, arena *anyenc.Arena, me
return false, nil
}
func (s *storeObject) SubscribeLastMessages(ctx context.Context, limit int) ([]*model.ChatMessage, int, error) {
func (s *storeObject) SubscribeLastMessages(ctx context.Context, subId string, limit int) ([]*model.ChatMessage, int, error) {
coll, err := s.store.Collection(ctx, collectionName)
if err != nil {
return nil, 0, fmt.Errorf("get collection: %w", err)
@ -322,13 +322,13 @@ func (s *storeObject) SubscribeLastMessages(ctx context.Context, limit int) ([]*
return messages[i].OrderId < messages[j].OrderId
})
s.subscription.enable()
s.subscription.subscribe(subId)
return messages, 0, nil
}
func (s *storeObject) Unsubscribe() error {
s.subscription.close()
func (s *storeObject) Unsubscribe(subId string) error {
s.subscription.unsubscribe(subId)
return nil
}
@ -336,7 +336,7 @@ func (s *storeObject) TryClose(objectTTL time.Duration) (res bool, err error) {
if !s.locker.TryLock() {
return false, nil
}
isActive := s.subscription.enabled
isActive := s.subscription.isActive()
s.Unlock()
if isActive {

View file

@ -7,6 +7,7 @@ import (
"github.com/anyproto/anytype-heart/core/session"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/util/slice"
)
type subscription struct {
@ -18,7 +19,7 @@ type subscription struct {
eventsBuffer []*pb.EventMessage
enabled bool
ids []string
}
func newSubscription(spaceId string, chatId string, eventSender event.Sender) *subscription {
@ -29,12 +30,18 @@ func newSubscription(spaceId string, chatId string, eventSender event.Sender) *s
}
}
func (s *subscription) enable() {
s.enabled = true
func (s *subscription) subscribe(subId string) {
if !slices.Contains(s.ids, subId) {
s.ids = append(s.ids, subId)
}
}
func (s *subscription) close() {
s.enabled = false
func (s *subscription) unsubscribe(subId string) {
s.ids = slice.Remove(s.ids, subId)
}
func (s *subscription) isActive() bool {
return len(s.ids) > 0
}
// setSessionContext sets the session context for the current operation
@ -59,7 +66,7 @@ func (s *subscription) flush() {
s.sessionContext.SetMessages(s.chatId, slices.Clone(s.eventsBuffer))
s.eventSender.BroadcastToOtherSessions(s.sessionContext.ID(), ev)
s.sessionContext = nil
} else if s.enabled {
} else if s.isActive() {
s.eventSender.Broadcast(ev)
}
}
@ -72,6 +79,7 @@ func (s *subscription) add(message *model.ChatMessage) {
Id: message.Id,
Message: message,
OrderId: message.OrderId,
SubIds: slices.Clone(s.ids),
}
s.eventsBuffer = append(s.eventsBuffer, event.NewMessage(s.spaceId, &pb.EventMessageValueOfChatAdd{
ChatAdd: ev,
@ -80,7 +88,8 @@ func (s *subscription) add(message *model.ChatMessage) {
func (s *subscription) delete(messageId string) {
ev := &pb.EventChatDelete{
Id: messageId,
Id: messageId,
SubIds: slices.Clone(s.ids),
}
s.eventsBuffer = append(s.eventsBuffer, event.NewMessage(s.spaceId, &pb.EventMessageValueOfChatDelete{
ChatDelete: ev,
@ -94,6 +103,7 @@ func (s *subscription) updateFull(message *model.ChatMessage) {
ev := &pb.EventChatUpdate{
Id: message.Id,
Message: message,
SubIds: slices.Clone(s.ids),
}
s.eventsBuffer = append(s.eventsBuffer, event.NewMessage(s.spaceId, &pb.EventMessageValueOfChatUpdate{
ChatUpdate: ev,
@ -107,6 +117,7 @@ func (s *subscription) updateReactions(message *model.ChatMessage) {
ev := &pb.EventChatUpdateReactions{
Id: message.Id,
Reactions: message.Reactions,
SubIds: slices.Clone(s.ids),
}
s.eventsBuffer = append(s.eventsBuffer, event.NewMessage(s.spaceId, &pb.EventMessageValueOfChatUpdateReactions{
ChatUpdateReactions: ev,
@ -117,7 +128,7 @@ func (s *subscription) canSend() bool {
if s.sessionContext != nil {
return true
}
if !s.enabled {
if !s.isActive() {
return false
}
return true

View file

@ -21,7 +21,7 @@ func TestSubscription(t *testing.T) {
assert.NotEmpty(t, messageId)
}
messages, _, err := fx.SubscribeLastMessages(ctx, 5)
messages, _, err := fx.SubscribeLastMessages(ctx, "subId", 5)
require.NoError(t, err)
wantTexts := []string{"text 6", "text 7", "text 8", "text 9", "text 10"}
for i, msg := range messages {

View file

@ -98,7 +98,7 @@ func (mw *Middleware) ChatGetMessagesByIds(cctx context.Context, req *pb.RpcChat
func (mw *Middleware) ChatSubscribeLastMessages(cctx context.Context, req *pb.RpcChatSubscribeLastMessagesRequest) *pb.RpcChatSubscribeLastMessagesResponse {
chatService := mustService[chats.Service](mw)
messages, numBefore, err := chatService.SubscribeLastMessages(cctx, req.ChatObjectId, int(req.Limit))
messages, numBefore, err := chatService.SubscribeLastMessages(cctx, req.ChatObjectId, int(req.Limit), req.SubId)
code := mapErrorCode[pb.RpcChatSubscribeLastMessagesResponseErrorCode](err)
return &pb.RpcChatSubscribeLastMessagesResponse{
Messages: messages,
@ -113,7 +113,7 @@ func (mw *Middleware) ChatSubscribeLastMessages(cctx context.Context, req *pb.Rp
func (mw *Middleware) ChatUnsubscribe(cctx context.Context, req *pb.RpcChatUnsubscribeRequest) *pb.RpcChatUnsubscribeResponse {
chatService := mustService[chats.Service](mw)
err := chatService.Unsubscribe(req.ChatObjectId)
err := chatService.Unsubscribe(req.ChatObjectId, req.SubId)
code := mapErrorCode[pb.RpcChatUnsubscribeResponseErrorCode](err)
return &pb.RpcChatUnsubscribeResponse{
Error: &pb.RpcChatUnsubscribeResponseError{

View file

@ -10287,6 +10287,7 @@ Get marks list in the selected range in text block.
| ----- | ---- | ----- | ----------- |
| chatObjectId | [string](#string) | | Identifier for the chat |
| limit | [int32](#int32) | | Number of max last messages to return and subscribe |
| subId | [string](#string) | | |
@ -10403,6 +10404,7 @@ Get marks list in the selected range in text block.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| chatObjectId | [string](#string) | | Identifier for the chat |
| subId | [string](#string) | | |
@ -27060,6 +27062,7 @@ Precondition: user A opened a block
| id | [string](#string) | | |
| orderId | [string](#string) | | |
| message | [model.ChatMessage](#anytype-model-ChatMessage) | | |
| subIds | [string](#string) | repeated | |
@ -27075,6 +27078,7 @@ Precondition: user A opened a block
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| id | [string](#string) | | |
| subIds | [string](#string) | repeated | |
@ -27091,6 +27095,7 @@ Precondition: user A opened a block
| ----- | ---- | ----- | ----------- |
| id | [string](#string) | | |
| message | [model.ChatMessage](#anytype-model-ChatMessage) | | |
| subIds | [string](#string) | repeated | |
@ -27107,6 +27112,7 @@ Precondition: user A opened a block
| ----- | ---- | ----- | ----------- |
| id | [string](#string) | | |
| reactions | [model.ChatMessage.Reactions](#anytype-model-ChatMessage-Reactions) | | |
| subIds | [string](#string) | repeated | |

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -7807,6 +7807,7 @@ message Rpc {
message Request {
string chatObjectId = 1; // Identifier for the chat
int32 limit = 2; // Number of max last messages to return and subscribe
string subId = 3;
}
message Response {
@ -7831,6 +7832,7 @@ message Rpc {
message Unsubscribe {
message Request {
string chatObjectId = 1; // Identifier for the chat
string subId = 2;
}
message Response {
Error error = 1;

View file

@ -124,17 +124,21 @@ message Event {
string id = 1;
string orderId = 2;
model.ChatMessage message = 3;
repeated string subIds = 4;
}
message Delete {
string id = 1;
repeated string subIds = 2;
}
message Update {
string id = 1;
model.ChatMessage message = 2;
repeated string subIds = 3;
}
message UpdateReactions {
string id = 1;
model.ChatMessage.Reactions reactions = 2;
repeated string subIds = 3;
}
}