mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-10 01:51:07 +09:00
GO-4727: Fix db timestamp update
This commit is contained in:
parent
96b2cefa28
commit
69cb15d388
6 changed files with 109 additions and 44 deletions
|
@ -18,13 +18,15 @@ import (
|
|||
type ChatHandler struct {
|
||||
subscription *subscription
|
||||
currentIdentity string
|
||||
// forceNotRead forces handler to mark all messages as not read. It's useful for unit testing
|
||||
forceNotRead bool
|
||||
}
|
||||
|
||||
func (d ChatHandler) CollectionName() string {
|
||||
func (d *ChatHandler) CollectionName() string {
|
||||
return collectionName
|
||||
}
|
||||
|
||||
func (d ChatHandler) Init(ctx context.Context, s *storestate.StoreState) (err error) {
|
||||
func (d *ChatHandler) Init(ctx context.Context, s *storestate.StoreState) (err error) {
|
||||
coll, err := s.Collection(ctx, collectionName)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -38,15 +40,20 @@ func (d ChatHandler) Init(ctx context.Context, s *storestate.StoreState) (err er
|
|||
return
|
||||
}
|
||||
|
||||
func (d ChatHandler) BeforeCreate(ctx context.Context, ch storestate.ChangeOp) (err error) {
|
||||
func (d *ChatHandler) BeforeCreate(ctx context.Context, ch storestate.ChangeOp) (err error) {
|
||||
msg := newMessageWrapper(ch.Arena, ch.Value)
|
||||
msg.setCreatedAt(ch.Change.Timestamp)
|
||||
msg.setCreator(ch.Change.Creator)
|
||||
if ch.Change.Creator == d.currentIdentity {
|
||||
msg.setRead(true)
|
||||
} else {
|
||||
if d.forceNotRead {
|
||||
msg.setRead(false)
|
||||
} else {
|
||||
if ch.Change.Creator == d.currentIdentity {
|
||||
msg.setRead(true)
|
||||
} else {
|
||||
msg.setRead(false)
|
||||
}
|
||||
}
|
||||
|
||||
msg.setAddedAt(timeid.NewNano())
|
||||
model := msg.toModel()
|
||||
model.OrderId = ch.Change.Order
|
||||
|
@ -55,11 +62,11 @@ func (d ChatHandler) BeforeCreate(ctx context.Context, ch storestate.ChangeOp) (
|
|||
return
|
||||
}
|
||||
|
||||
func (d ChatHandler) BeforeModify(ctx context.Context, ch storestate.ChangeOp) (mode storestate.ModifyMode, err error) {
|
||||
func (d *ChatHandler) BeforeModify(ctx context.Context, ch storestate.ChangeOp) (mode storestate.ModifyMode, err error) {
|
||||
return storestate.ModifyModeUpsert, nil
|
||||
}
|
||||
|
||||
func (d ChatHandler) BeforeDelete(ctx context.Context, ch storestate.ChangeOp) (mode storestate.DeleteMode, err error) {
|
||||
func (d *ChatHandler) BeforeDelete(ctx context.Context, ch storestate.ChangeOp) (mode storestate.DeleteMode, err error) {
|
||||
coll, err := ch.State.Collection(ctx, collectionName)
|
||||
if err != nil {
|
||||
return storestate.DeleteModeDelete, fmt.Errorf("get collection: %w", err)
|
||||
|
@ -81,7 +88,7 @@ func (d ChatHandler) BeforeDelete(ctx context.Context, ch storestate.ChangeOp) (
|
|||
return storestate.DeleteModeDelete, nil
|
||||
}
|
||||
|
||||
func (d ChatHandler) UpgradeKeyModifier(ch storestate.ChangeOp, key *pb.KeyModify, mod query.Modifier) query.Modifier {
|
||||
func (d *ChatHandler) UpgradeKeyModifier(ch storestate.ChangeOp, key *pb.KeyModify, mod query.Modifier) query.Modifier {
|
||||
return query.ModifyFunc(func(a *anyenc.Arena, v *anyenc.Value) (result *anyenc.Value, modified bool, err error) {
|
||||
if len(key.KeyPath) == 0 {
|
||||
return nil, false, fmt.Errorf("no key path")
|
||||
|
|
|
@ -81,6 +81,7 @@ type storeObject struct {
|
|||
subscription *subscription
|
||||
crdtDb anystore.DB
|
||||
spaceIndex spaceindex.Store
|
||||
chatHandler *ChatHandler
|
||||
|
||||
arenaPool *anyenc.ArenaPool
|
||||
componentCtx context.Context
|
||||
|
@ -116,10 +117,12 @@ func (s *storeObject) Init(ctx *smartblock.InitContext) error {
|
|||
|
||||
s.subscription = newSubscription(s.SpaceID(), s.Id(), s.eventSender, s.spaceIndex)
|
||||
|
||||
stateStore, err := storestate.New(ctx.Ctx, s.Id(), s.crdtDb, ChatHandler{
|
||||
s.chatHandler = &ChatHandler{
|
||||
subscription: s.subscription,
|
||||
currentIdentity: s.accountService.AccountID(),
|
||||
})
|
||||
}
|
||||
|
||||
stateStore, err := storestate.New(ctx.Ctx, s.Id(), s.crdtDb, s.chatHandler)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create state store: %w", err)
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ func newFixture(t *testing.T) *fixture {
|
|||
onSeenHook(collectedHeads)
|
||||
|
||||
return nil
|
||||
})
|
||||
}).Maybe()
|
||||
|
||||
fx.source = source
|
||||
|
||||
|
@ -133,30 +133,61 @@ func newFixture(t *testing.T) *fixture {
|
|||
}
|
||||
|
||||
func TestAddMessage(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
sessionCtx := session.NewContext()
|
||||
t.Run("add own messages", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
sessionCtx := session.NewContext()
|
||||
|
||||
fx := newFixture(t)
|
||||
fx.eventSender.EXPECT().BroadcastToOtherSessions(mock.Anything, mock.Anything).Return()
|
||||
fx := newFixture(t)
|
||||
fx.eventSender.EXPECT().BroadcastToOtherSessions(mock.Anything, mock.Anything).Return()
|
||||
|
||||
inputMessage := givenComplexMessage()
|
||||
messageId, err := fx.AddMessage(ctx, sessionCtx, inputMessage)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, messageId)
|
||||
assert.NotEmpty(t, sessionCtx.GetMessages())
|
||||
inputMessage := givenComplexMessage()
|
||||
messageId, err := fx.AddMessage(ctx, sessionCtx, inputMessage)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, messageId)
|
||||
assert.NotEmpty(t, sessionCtx.GetMessages())
|
||||
|
||||
messagesResp, err := fx.GetMessages(ctx, GetMessagesRequest{})
|
||||
require.NoError(t, err)
|
||||
messagesResp, err := fx.GetMessages(ctx, GetMessagesRequest{})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, messagesResp.Messages, 1)
|
||||
|
||||
require.Len(t, messagesResp.Messages, 1)
|
||||
want := givenComplexMessage()
|
||||
want.Id = messageId
|
||||
want.Creator = testCreator
|
||||
want.Read = true
|
||||
|
||||
want := givenComplexMessage()
|
||||
want.Id = messageId
|
||||
want.Creator = testCreator
|
||||
want.Read = true
|
||||
got := messagesResp.Messages[0]
|
||||
assertMessagesEqual(t, want, got)
|
||||
})
|
||||
|
||||
got := messagesResp.Messages[0]
|
||||
assertMessagesEqual(t, want, got)
|
||||
t.Run("imitate adding other's messages", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
sessionCtx := session.NewContext()
|
||||
|
||||
fx := newFixture(t)
|
||||
fx.eventSender.EXPECT().BroadcastToOtherSessions(mock.Anything, mock.Anything).Return()
|
||||
|
||||
// Force all messages as not read
|
||||
fx.chatHandler.forceNotRead = true
|
||||
|
||||
inputMessage := givenComplexMessage()
|
||||
messageId, err := fx.AddMessage(ctx, sessionCtx, inputMessage)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, messageId)
|
||||
assert.NotEmpty(t, sessionCtx.GetMessages())
|
||||
|
||||
messagesResp, err := fx.GetMessages(ctx, GetMessagesRequest{})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, messagesResp.Messages, 1)
|
||||
assert.Equal(t, messagesResp.ChatState.DbTimestamp, messagesResp.Messages[0].AddedAt)
|
||||
|
||||
want := givenComplexMessage()
|
||||
want.Id = messageId
|
||||
want.Creator = testCreator
|
||||
want.Read = false
|
||||
|
||||
got := messagesResp.Messages[0]
|
||||
assertMessagesEqual(t, want, got)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetMessages(t *testing.T) {
|
||||
|
@ -173,6 +204,10 @@ func TestGetMessages(t *testing.T) {
|
|||
|
||||
messagesResp, err := fx.GetMessages(ctx, GetMessagesRequest{Limit: 5})
|
||||
require.NoError(t, err)
|
||||
|
||||
lastMessage := messagesResp.Messages[4]
|
||||
assert.Equal(t, messagesResp.ChatState.DbTimestamp, lastMessage.AddedAt)
|
||||
|
||||
wantTexts := []string{"text 6", "text 7", "text 8", "text 9", "text 10"}
|
||||
for i, msg := range messagesResp.Messages {
|
||||
assert.Equal(t, wantTexts[i], msg.Message.Text)
|
||||
|
@ -355,6 +390,25 @@ func TestToggleReaction(t *testing.T) {
|
|||
func TestReadMessages(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
fx := newFixture(t)
|
||||
fx.chatHandler.forceNotRead = true
|
||||
const n = 10
|
||||
for i := 0; i < n; i++ {
|
||||
_, err := fx.AddMessage(ctx, nil, givenSimpleMessage(fmt.Sprintf("message %d", i+1)))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
// All messages forced as not read
|
||||
messagesResp := fx.assertReadStatus(t, ctx, "", "", false)
|
||||
|
||||
err := fx.MarkReadMessages(ctx, "", messagesResp.Messages[2].OrderId, messagesResp.ChatState.DbTimestamp)
|
||||
require.NoError(t, err)
|
||||
|
||||
fx.assertReadStatus(t, ctx, "", messagesResp.Messages[2].OrderId, true)
|
||||
fx.assertReadStatus(t, ctx, messagesResp.Messages[3].OrderId, "", false)
|
||||
}
|
||||
|
||||
func TestMarkMessagesAsNotRead(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
fx := newFixture(t)
|
||||
|
||||
const n = 10
|
||||
for i := 0; i < n; i++ {
|
||||
|
@ -364,19 +418,12 @@ func TestReadMessages(t *testing.T) {
|
|||
// All messages added by myself are read
|
||||
fx.assertReadStatus(t, ctx, "", "", true)
|
||||
|
||||
// For testing purposes we need to mark messages as not read
|
||||
fx.source.EXPECT().InitDiffManager(mock.Anything, mock.Anything).Return(nil)
|
||||
fx.source.EXPECT().StoreSeenHeads(mock.Anything).Return(nil)
|
||||
err := fx.MarkMessagesAsUnread(ctx, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
messagesResp := fx.assertReadStatus(t, ctx, "", "", false)
|
||||
|
||||
err = fx.MarkReadMessages(ctx, "", messagesResp.Messages[2].OrderId, messagesResp.ChatState.DbTimestamp)
|
||||
require.NoError(t, err)
|
||||
|
||||
fx.assertReadStatus(t, ctx, "", messagesResp.Messages[2].OrderId, true)
|
||||
fx.assertReadStatus(t, ctx, messagesResp.Messages[3].OrderId, "", false)
|
||||
fx.assertReadStatus(t, ctx, "", "", false)
|
||||
}
|
||||
|
||||
func (fx *fixture) assertReadStatus(t *testing.T, ctx context.Context, afterOrderId string, beforeOrderId string, isRead bool) *GetMessagesResponse {
|
||||
|
|
|
@ -128,7 +128,7 @@ func marshalModel(arena *anyenc.Arena, msg *model.ChatMessage) *anyenc.Value {
|
|||
root.Set(modifiedAtKey, arena.NewNumberInt(int(msg.ModifiedAt)))
|
||||
root.Set("replyToMessageId", arena.NewString(msg.ReplyToMessageId))
|
||||
root.Set(contentKey, content)
|
||||
var read = arena.NewObject()
|
||||
var read *anyenc.Value
|
||||
if msg.Read {
|
||||
read = arena.NewTrue()
|
||||
} else {
|
||||
|
|
|
@ -124,14 +124,18 @@ func (s *subscription) getIdentityDetails(identity string) (*domain.Details, err
|
|||
}
|
||||
|
||||
func (s *subscription) add(prevOrderId string, message *model.ChatMessage) {
|
||||
if !message.Read {
|
||||
s.updateChatState(func(state *model.ChatState) {
|
||||
|
||||
s.updateChatState(func(state *model.ChatState) {
|
||||
if !message.Read {
|
||||
if message.OrderId < state.Messages.OldestOrderId {
|
||||
state.Messages.OldestOrderId = message.OrderId
|
||||
}
|
||||
state.Messages.Counter++
|
||||
})
|
||||
}
|
||||
}
|
||||
if message.AddedAt > state.DbTimestamp {
|
||||
state.DbTimestamp = message.AddedAt
|
||||
}
|
||||
})
|
||||
|
||||
if !s.canSend() {
|
||||
return
|
||||
|
|
|
@ -33,11 +33,15 @@ func TestSubscription(t *testing.T) {
|
|||
|
||||
messageId, err := fx.AddMessage(ctx, nil, givenComplexMessage())
|
||||
require.NoError(t, err)
|
||||
require.Len(t, fx.events, 1)
|
||||
require.Len(t, fx.events, 2)
|
||||
|
||||
ev := fx.events[0].GetChatAdd()
|
||||
require.NotNil(t, ev)
|
||||
assert.Equal(t, messageId, ev.Id)
|
||||
|
||||
evState := fx.events[1].GetChatStateUpdate()
|
||||
require.NotNil(t, evState)
|
||||
assert.True(t, evState.State.DbTimestamp > 0)
|
||||
})
|
||||
|
||||
t.Run("edit message", func(t *testing.T) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue