1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-10 18:10:49 +09:00

GO-4727: Refactor

This commit is contained in:
Sergey 2025-03-11 13:56:44 +01:00
parent 7fd5cf8716
commit d84c55efad
No known key found for this signature in database
GPG key ID: 3B6BEF79160221C6

View file

@ -66,6 +66,7 @@ type storeObject struct {
smartblock.SmartBlock
locker smartblock.Locker
collection anystore.Collection
accountService AccountService
storeSource source.Store
store *storestate.StoreState
@ -99,7 +100,6 @@ func (s *storeObject) Init(ctx *smartblock.InitContext) error {
if !ok {
return fmt.Errorf("source is not a store")
}
storeSource.SetDiffManagerOnRemoveHook(s.markReadMessages)
err := s.SmartBlock.Init(ctx)
if err != nil {
@ -118,6 +118,11 @@ func (s *storeObject) Init(ctx *smartblock.InitContext) error {
}
s.store = stateStore
s.collection, err = s.store.Collection(s.componentCtx, collectionName)
if err != nil {
return fmt.Errorf("get s.collection.ction: %w", err)
}
s.storeSource = storeSource
err = storeSource.ReadStoreDoc(ctx.Ctx, stateStore, s.onUpdate)
if err != nil {
@ -135,55 +140,27 @@ func (s *storeObject) onUpdate() {
// initialChatState returns the initial chat state for the chat object from the DB
func (s *storeObject) initialChatState() (*model.ChatState, error) {
coll, err := s.store.Collection(s.componentCtx, collectionName)
if err != nil {
return nil, fmt.Errorf("get collection: %w", err)
}
txn, err := coll.ReadTx(s.componentCtx)
txn, err := s.collection.ReadTx(s.componentCtx)
if err != nil {
return nil, fmt.Errorf("start read tx: %w", err)
}
defer func() {
errCommit := txn.Commit()
if errCommit != nil {
log.With(zap.Error(errCommit)).Error("read tx commit error")
}
}()
defer txn.Commit()
ctx := txn.Context()
unreadQuery := coll.Find(query.Key{Path: []string{readKey}, Filter: query.NewComp(query.CompOpEq, false)}).Sort(ascOrder)
iter, err := unreadQuery.Limit(1).Iter(ctx)
var oldestOrderId string
defer iter.Close()
for iter.Next() {
doc, err := iter.Doc()
if err != nil {
return nil, fmt.Errorf("get doc: %w", err)
}
oldestOrderId = doc.Value().GetObject(orderKey).Get("id").GetString()
oldestOrderId, err := s.getOldestOrderId(txn)
if err != nil {
return nil, fmt.Errorf("get oldest order id: %w", err)
}
count, err := unreadQuery.Count(ctx)
count, err := s.countUnreadMessages(txn)
if err != nil {
return nil, fmt.Errorf("update messages: %w", err)
}
lastAddedDate := coll.Find(query.All{}).Sort(descAdded).Limit(1)
iter, err = lastAddedDate.Iter(ctx)
lastAdded, err := s.getLastAddedDate(txn)
if err != nil {
return nil, fmt.Errorf("find last added date: %w", err)
}
defer iter.Close()
var lastAdded int
for iter.Next() {
doc, err := iter.Doc()
if err != nil {
return nil, fmt.Errorf("get doc: %w", err)
}
lastAdded = doc.Value().GetInt(addedKey)
return nil, fmt.Errorf("get last added date: %w", err)
}
return &model.ChatState{
Messages: &model.ChatStateUnreadState{
OldestOrderId: oldestOrderId,
@ -193,16 +170,56 @@ func (s *storeObject) initialChatState() (*model.ChatState, error) {
DbTimestamp: int64(lastAdded),
}, nil
}
func (s *storeObject) getOldestOrderId(txn anystore.ReadTx) (string, error) {
unreadQuery := s.collection.Find(query.Key{Path: []string{readKey}, Filter: query.NewComp(query.CompOpEq, false)}).Sort(ascOrder)
iter, err := unreadQuery.Limit(1).Iter(txn.Context())
if err != nil {
return "", fmt.Errorf("init iter: %w", err)
}
defer iter.Close()
for iter.Next() {
doc, err := iter.Doc()
if err != nil {
return "", fmt.Errorf("get doc: %w", err)
}
return doc.Value().GetObject(orderKey).Get("id").GetString(), nil
}
return "", nil
}
func (s *storeObject) countUnreadMessages(txn anystore.ReadTx) (int, error) {
unreadQuery := s.collection.Find(query.Key{Path: []string{readKey}, Filter: query.NewComp(query.CompOpEq, false)}).Sort(ascOrder)
return unreadQuery.Limit(1).Count(txn.Context())
}
func (s *storeObject) getLastAddedDate(txn anystore.ReadTx) (int, error) {
lastAddedDate := s.collection.Find(query.All{}).Sort(descAdded).Limit(1)
iter, err := lastAddedDate.Iter(txn.Context())
if err != nil {
return 0, fmt.Errorf("find last added date: %w", err)
}
defer iter.Close()
for iter.Next() {
doc, err := iter.Doc()
if err != nil {
return 0, fmt.Errorf("get doc: %w", err)
}
return doc.Value().GetInt(addedKey), nil
}
return 0, nil
}
func (s *storeObject) markReadMessages(ids []string) {
if len(ids) == 0 {
return
}
coll, err := s.store.Collection(s.componentCtx, collectionName)
if err != nil {
log.With(zap.Error(err)).Error("markReadMessages: get collection")
return
}
txn, err := coll.WriteTx(s.componentCtx)
txn, err := s.collection.WriteTx(s.componentCtx)
if err != nil {
log.With(zap.Error(err)).Error("markReadMessages: start write tx")
return
@ -214,7 +231,7 @@ func (s *storeObject) markReadMessages(ids []string) {
// skip tree root
continue
}
res, err := coll.UpdateId(ctx, id, query.MustParseModifier(`{"$set":{"`+readKey+`":true}}`))
res, err := s.collection.UpdateId(ctx, id, query.MustParseModifier(`{"$set":{"`+readKey+`":true}}`))
if err != nil {
log.With(zap.Error(err)).With(zap.String("id", id)).With(zap.String("chatId", s.Id())).Error("markReadMessages: update message")
continue
@ -232,7 +249,7 @@ func (s *storeObject) markReadMessages(ids []string) {
if len(idsModified) > 0 {
// it doesn't work within the same transaction
// query the new oldest unread message's orderId
iter, err := coll.Find(
iter, err := s.collection.Find(
query.Key{Path: []string{readKey}, Filter: query.NewComp(query.CompOpEq, false)},
).Sort(ascOrder).
Limit(1).
@ -275,17 +292,12 @@ func (s *storeObject) MarkReadMessages(ctx context.Context, afterOrderId, before
}
func (s *storeObject) GetLastAddedMessageInOrderRange(ctx context.Context, afterOrderId, beforeOrderId string, lastAddedMessageTimestamp int64) (*model.ChatMessage, error) {
coll, err := s.store.Collection(ctx, collectionName)
if err != nil {
return nil, fmt.Errorf("get collection: %w", err)
}
if lastAddedMessageTimestamp < 0 {
// todo: remove this
// for testing purposes
lastAddedMessageTimestamp = math.MaxInt64
}
iter, err := coll.Find(
iter, err := s.collection.Find(
query.And{
query.Key{Path: []string{orderKey, "id"}, Filter: query.NewComp(query.CompOpGte, afterOrderId)},
query.Key{Path: []string{orderKey, "id"}, Filter: query.NewComp(query.CompOpLte, beforeOrderId)},
@ -315,17 +327,13 @@ func (s *storeObject) GetLastAddedMessageInOrderRange(ctx context.Context, after
}
func (s *storeObject) GetMessagesByIds(ctx context.Context, messageIds []string) ([]*model.ChatMessage, error) {
coll, err := s.store.Collection(ctx, collectionName)
if err != nil {
return nil, fmt.Errorf("get collection: %w", err)
}
txn, err := coll.ReadTx(ctx)
txn, err := s.collection.ReadTx(ctx)
if err != nil {
return nil, fmt.Errorf("start read tx: %w", err)
}
messages := make([]*model.ChatMessage, 0, len(messageIds))
for _, messageId := range messageIds {
obj, err := coll.FindId(txn.Context(), messageId)
obj, err := s.collection.FindId(txn.Context(), messageId)
if errors.Is(err, anystore.ErrDocNotFound) {
continue
}
@ -344,29 +352,24 @@ type GetMessagesResponse struct {
}
func (s *storeObject) GetMessages(ctx context.Context, req GetMessagesRequest) (*GetMessagesResponse, error) {
coll, err := s.store.Collection(ctx, collectionName)
if err != nil {
return nil, fmt.Errorf("get collection: %w", err)
}
var qry anystore.Query
if req.AfterOrderId != "" {
operator := query.CompOpGt
if req.IncludeBoundary {
operator = query.CompOpGte
}
qry = coll.Find(query.Key{Path: []string{orderKey, "id"}, Filter: query.NewComp(operator, req.AfterOrderId)}).Sort(ascOrder).Limit(uint(req.Limit))
qry = s.collection.Find(query.Key{Path: []string{orderKey, "id"}, Filter: query.NewComp(operator, req.AfterOrderId)}).Sort(ascOrder).Limit(uint(req.Limit))
} else if req.BeforeOrderId != "" {
operator := query.CompOpLt
if req.IncludeBoundary {
operator = query.CompOpLte
}
qry = coll.Find(query.Key{Path: []string{orderKey, "id"}, Filter: query.NewComp(operator, req.BeforeOrderId)}).Sort(descOrder).Limit(uint(req.Limit))
qry = s.collection.Find(query.Key{Path: []string{orderKey, "id"}, Filter: query.NewComp(operator, req.BeforeOrderId)}).Sort(descOrder).Limit(uint(req.Limit))
} else {
qry = coll.Find(nil).Sort(descOrder).Limit(uint(req.Limit))
qry = s.collection.Find(nil).Sort(descOrder).Limit(uint(req.Limit))
}
// make sure we flush all the pending message updates first
chatState := s.subscription.flush()
// todo here is possible race if new messages are added between the flush and the query
msgs, err := s.queryMessages(ctx, qry)
if err != nil {
return nil, fmt.Errorf("query messages: %w", err)
@ -510,11 +513,7 @@ func (s *storeObject) ToggleMessageReaction(ctx context.Context, messageId strin
}
func (s *storeObject) hasMyReaction(ctx context.Context, arena *anyenc.Arena, messageId string, emoji string) (bool, error) {
coll, err := s.store.Collection(ctx, collectionName)
if err != nil {
return false, fmt.Errorf("get collection: %w", err)
}
doc, err := coll.FindId(ctx, messageId)
doc, err := s.collection.FindId(ctx, messageId)
if err != nil {
return false, fmt.Errorf("find message: %w", err)
}
@ -531,18 +530,13 @@ func (s *storeObject) hasMyReaction(ctx context.Context, arena *anyenc.Arena, me
}
func (s *storeObject) SubscribeLastMessages(ctx context.Context, subId string, limit int, asyncInit bool) ([]*model.ChatMessage, int, *model.ChatState, error) {
coll, err := s.store.Collection(ctx, collectionName)
if err != nil {
return nil, 0, nil, fmt.Errorf("get collection: %w", err)
}
txn, err := s.store.NewTx(ctx)
if err != nil {
return nil, 0, nil, fmt.Errorf("init read transaction: %w", err)
}
defer txn.Commit()
query := coll.Find(nil).Sort(descOrder).Limit(uint(limit))
query := s.collection.Find(nil).Sort(descOrder).Limit(uint(limit))
messages, err := s.queryMessages(txn.Context(), query)
if err != nil {
return nil, 0, nil, fmt.Errorf("query messages: %w", err)