1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-12 10:40:55 +09:00

GO-4727: Refactor subscription

This commit is contained in:
Sergey 2025-03-11 14:40:12 +01:00
parent d84c55efad
commit c56d9780a8
No known key found for this signature in database
GPG key ID: 3B6BEF79160221C6
4 changed files with 62 additions and 58 deletions

View file

@ -105,6 +105,7 @@ func (s *storeObject) Init(ctx *smartblock.InitContext) error {
if err != nil {
return err
}
s.storeSource = storeSource
s.subscription = newSubscription(s.SpaceID(), s.Id(), s.eventSender, s.spaceIndex)
@ -112,18 +113,20 @@ func (s *storeObject) Init(ctx *smartblock.InitContext) error {
subscription: s.subscription,
currentIdentity: s.accountService.AccountID(),
})
if err != nil {
return fmt.Errorf("create state store: %w", err)
}
s.store = stateStore
s.collection, err = s.store.Collection(s.componentCtx, collectionName)
if err != nil {
return fmt.Errorf("get s.collection.ction: %w", err)
}
s.storeSource = storeSource
s.subscription.chatState, err = s.initialChatState()
if err != nil {
return fmt.Errorf("init chat state: %w", err)
}
err = storeSource.ReadStoreDoc(ctx.Ctx, stateStore, s.onUpdate)
if err != nil {
return fmt.Errorf("read store doc: %w", err)
@ -135,7 +138,7 @@ func (s *storeObject) Init(ctx *smartblock.InitContext) error {
}
func (s *storeObject) onUpdate() {
_ = s.subscription.flush()
s.subscription.flush()
}
// initialChatState returns the initial chat state for the chat object from the DB
@ -245,7 +248,7 @@ func (s *storeObject) markReadMessages(ids []string) {
log.With(zap.Error(err)).Error("markReadMessages: commit")
return
}
log.Debug(fmt.Sprintf("markReadMessages: %d/%d messages marked as read", len(idsModified), len(ids)))
if len(idsModified) > 0 {
// it doesn't work within the same transaction
// query the new oldest unread message's orderId
@ -268,8 +271,11 @@ func (s *storeObject) markReadMessages(ids []string) {
newOldestOrderId = val.Value().GetObject(orderKey).Get("id").GetString()
}
}
log.Debug(fmt.Sprintf("markReadMessages: new oldest unread message: %s", newOldestOrderId))
s.subscription.chatState.Messages.OldestOrderId = newOldestOrderId
s.subscription.updateChatState(func(state *model.ChatState) {
state.Messages.OldestOrderId = newOldestOrderId
})
s.subscription.updateReadStatus(idsModified, true)
s.onUpdate()
}
@ -368,8 +374,7 @@ func (s *storeObject) GetMessages(ctx context.Context, req GetMessagesRequest) (
} else {
qry = s.collection.Find(nil).Sort(descOrder).Limit(uint(req.Limit))
}
// make sure we flush all the pending message updates first
chatState := s.subscription.flush()
msgs, err := s.queryMessages(ctx, qry)
if err != nil {
return nil, fmt.Errorf("query messages: %w", err)
@ -380,7 +385,7 @@ func (s *storeObject) GetMessages(ctx context.Context, req GetMessagesRequest) (
return &GetMessagesResponse{
Messages: msgs,
ChatState: chatState,
ChatState: s.subscription.getChatState(),
}, nil
}
@ -546,13 +551,8 @@ func (s *storeObject) SubscribeLastMessages(ctx context.Context, subId string, l
return messages[i].OrderId < messages[j].OrderId
})
isNewSubscription := s.subscription.subscribe(subId)
if isNewSubscription {
s.subscription.chatState, err = s.initialChatState()
if err != nil {
return nil, 0, s.subscription.chatState, fmt.Errorf("failed to fetch initial chat state: %w", err)
}
}
s.subscription.subscribe(subId)
if asyncInit {
var previousOrderId string
if len(messages) > 0 {
@ -565,11 +565,13 @@ func (s *storeObject) SubscribeLastMessages(ctx context.Context, subId string, l
s.subscription.add(previousOrderId, message)
previousOrderId = message.OrderId
}
// Force chatState to be sent
s.subscription.chatStateUpdated = true
s.subscription.flush()
// TODO Do not return chat state?
return nil, 0, s.subscription.chatState, nil
return nil, 0, nil, nil
} else {
return messages, 0, s.subscription.chatState, nil
return messages, 0, s.subscription.getChatState(), nil
}
}

View file

@ -175,6 +175,7 @@ func TestGetMessagesByIds(t *testing.T) {
want := givenMessage()
want.Id = messageId
want.Creator = testCreator
want.Read = true
got := messages[0]
assertMessagesEqual(t, want, got)
}
@ -205,6 +206,7 @@ func TestEditMessage(t *testing.T) {
want := editedMessage
want.Id = messageId
want.Creator = testCreator
want.Read = true
got := messagesResp.Messages[0]
assert.True(t, got.ModifiedAt > 0)
@ -338,7 +340,7 @@ func givenMessage() *model.ChatMessage {
Id: "",
OrderId: "",
Creator: "",
Read: true,
Read: false,
ReplyToMessageId: "replyToMessageId1",
Message: &model.ChatMessageMessageContent{
Text: "text!",

View file

@ -29,7 +29,9 @@ type subscription struct {
eventsBuffer []*pb.EventMessage
identityCache *expirable.LRU[string, *domain.Details]
ids []string
chatState *model.ChatState
chatState *model.ChatState
chatStateUpdated bool
}
func newSubscription(spaceId string, chatId string, eventSender event.Sender, spaceIndex spaceindex.Store) *subscription {
@ -68,21 +70,30 @@ func (s *subscription) setSessionContext(ctx session.Context) {
s.sessionContext = ctx
}
func (s *subscription) flush() *model.ChatState {
func (s *subscription) getChatState() *model.ChatState {
return copyChatState(s.chatState)
}
func (s *subscription) updateChatState(updater func(*model.ChatState)) {
updater(s.chatState)
}
func (s *subscription) flush() {
events := slices.Clone(s.eventsBuffer)
s.eventsBuffer = s.eventsBuffer[:0]
chatState := copyChatState(s.chatState)
events = append(events, event.NewMessage(s.spaceId, &pb.EventMessageValueOfChatStateUpdate{ChatStateUpdate: &pb.EventChatUpdateState{
State: chatState,
}}))
if s.chatStateUpdated {
events = append(events, event.NewMessage(s.spaceId, &pb.EventMessageValueOfChatStateUpdate{ChatStateUpdate: &pb.EventChatUpdateState{
State: s.getChatState(),
}}))
s.chatStateUpdated = false
}
ev := &pb.Event{
ContextId: s.chatId,
Messages: events,
}
// ????
if s.sessionContext != nil {
s.sessionContext.SetMessages(s.chatId, events)
s.eventSender.BroadcastToOtherSessions(s.sessionContext.ID(), ev)
@ -90,7 +101,6 @@ func (s *subscription) flush() *model.ChatState {
} else if s.isActive() {
s.eventSender.Broadcast(ev)
}
return chatState
}
func (s *subscription) getIdentityDetails(identity string) (*domain.Details, error) {
@ -138,10 +148,13 @@ func (s *subscription) add(prevOrderId string, message *model.ChatMessage) {
}
if !message.Read {
if message.OrderId < s.chatState.Messages.OldestOrderId {
s.chatState.Messages.OldestOrderId = message.OrderId
}
s.chatState.Messages.Counter++
s.updateChatState(func(state *model.ChatState) {
if message.OrderId < state.Messages.OldestOrderId {
state.Messages.OldestOrderId = message.OrderId
}
state.Messages.Counter++
})
}
s.eventsBuffer = append(s.eventsBuffer, event.NewMessage(s.spaceId, &pb.EventMessageValueOfChatAdd{
ChatAdd: ev,
@ -192,11 +205,14 @@ func (s *subscription) updateReadStatus(ids []string, read bool) {
if !s.canSend() {
return
}
if read {
s.chatState.Messages.Counter -= int32(len(ids))
} else {
s.chatState.Messages.Counter += int32(len(ids))
}
s.updateChatState(func(state *model.ChatState) {
if read {
state.Messages.Counter -= int32(len(ids))
} else {
state.Messages.Counter += int32(len(ids))
}
})
s.eventsBuffer = append(s.eventsBuffer, event.NewMessage(s.spaceId, &pb.EventMessageValueOfChatUpdateReadStatus{
ChatUpdateReadStatus: &pb.EventChatUpdateReadStatus{

View file

@ -33,15 +33,11 @@ func TestSubscription(t *testing.T) {
messageId, err := fx.AddMessage(ctx, nil, givenMessage())
require.NoError(t, err)
require.Len(t, fx.events, 2)
require.Len(t, fx.events, 1)
ev := fx.events[0].GetChatAdd()
require.NotNil(t, ev)
assert.Equal(t, messageId, ev.Id)
evState := fx.events[1].GetChatStateUpdate()
require.NotNil(t, evState)
assert.NotZero(t, evState.State.DbTimestamp)
})
t.Run("edit message", func(t *testing.T) {
@ -52,16 +48,12 @@ func TestSubscription(t *testing.T) {
err = fx.EditMessage(ctx, messages[0].Id, edited)
require.NoError(t, err)
require.Len(t, fx.events, 2)
require.Len(t, fx.events, 1)
ev := fx.events[0].GetChatUpdate()
require.NotNil(t, ev)
assert.Equal(t, messages[0].Id, ev.Id)
assert.Equal(t, edited.Message.Text, ev.Message.Message.Text)
evState := fx.events[1].GetChatStateUpdate()
require.NotNil(t, evState)
assert.NotZero(t, evState.State.DbTimestamp)
})
t.Run("toggle message reaction", func(t *testing.T) {
@ -69,17 +61,13 @@ func TestSubscription(t *testing.T) {
err = fx.ToggleMessageReaction(ctx, messages[0].Id, "👍")
require.NoError(t, err)
require.Len(t, fx.events, 2)
require.Len(t, fx.events, 1)
ev := fx.events[0].GetChatUpdateReactions()
require.NotNil(t, ev)
assert.Equal(t, messages[0].Id, ev.Id)
_, ok := ev.Reactions.Reactions["👍"]
assert.True(t, ok)
evState := fx.events[1].GetChatStateUpdate()
require.NotNil(t, evState)
assert.NotZero(t, evState.State.DbTimestamp)
})
t.Run("delete message", func(t *testing.T) {
@ -87,14 +75,10 @@ func TestSubscription(t *testing.T) {
err = fx.DeleteMessage(ctx, messages[0].Id)
require.NoError(t, err)
require.Len(t, fx.events, 2)
require.Len(t, fx.events, 1)
ev := fx.events[0].GetChatDelete()
require.NotNil(t, ev)
assert.Equal(t, messages[0].Id, ev.Id)
evState := fx.events[1].GetChatStateUpdate()
require.NotNil(t, evState)
assert.NotZero(t, evState.State.DbTimestamp)
})
}