1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-08 05:47:07 +09:00

GO-5297 Merge branch 'main' of github.com:anyproto/anytype-heart into go-5297-push-service-1st-iteration

This commit is contained in:
Sergey 2025-04-24 17:10:43 +02:00
commit e7c41d4982
No known key found for this signature in database
GPG key ID: 3B6BEF79160221C6
87 changed files with 2749 additions and 2233 deletions

View file

@ -494,7 +494,8 @@ func removeAccountRelatedDetails(s *pb.ChangeSnapshot) {
bundle.RelationKeySyncDate.String(),
bundle.RelationKeySyncError.String(),
bundle.RelationKeySyncStatus.String(),
bundle.RelationKeyChatId.String():
bundle.RelationKeyChatId.String(),
bundle.RelationKeyType.String():
delete(s.Data.Details.Fields, key)
}

View file

@ -19,7 +19,7 @@ import (
"go.uber.org/zap"
"github.com/anyproto/anytype-heart/core/anytype/account"
"github.com/anyproto/anytype-heart/core/block/chats/push"
"github.com/anyproto/anytype-heart/core/block/chats/chatpush"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/inviteservice"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -128,7 +128,7 @@ func (a *aclService) subscribeToPushNotifications(err error, spaceId string) {
if err != nil {
log.Error("create space for push message", zap.Error(err))
} else {
a.pushService.SubscribeToTopics(context.Background(), spaceId, []string{push.ChatsTopicName})
a.pushService.SubscribeToTopics(context.Background(), spaceId, []string{chatpush.ChatsTopicName})
}
}

View file

@ -55,7 +55,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/object/treemanager"
"github.com/anyproto/anytype-heart/core/block/process"
"github.com/anyproto/anytype-heart/core/block/restriction"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/block/source/sourceimpl"
"github.com/anyproto/anytype-heart/core/block/template/templateimpl"
"github.com/anyproto/anytype-heart/core/configfetcher"
"github.com/anyproto/anytype-heart/core/debug"
@ -265,7 +265,7 @@ func Bootstrap(a *app.App, components ...app.Component) {
Register(fileoffloader.New()).
Register(fileacl.New()).
Register(chats.New()).
Register(source.New()).
Register(sourceimpl.New()).
Register(spacefactory.New()).
Register(space.New()).
Register(idderiverimpl.New()).

View file

@ -5,8 +5,11 @@ import (
"errors"
"os"
"path/filepath"
"time"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/util/debug"
"gopkg.in/yaml.v3"
"github.com/anyproto/anytype-heart/core/anytype/config"
@ -32,6 +35,18 @@ func (s *Service) cancelStartIfInProcess() {
func (s *Service) AccountStop(req *pb.RpcAccountStopRequest) error {
s.cancelStartIfInProcess()
stopped := make(chan struct{})
defer close(stopped)
go func() {
select {
case <-stopped:
case <-time.After(app.StopDeadline + time.Second*5):
// this is extra protection in case we stuck at s.lock
_, _ = os.Stderr.Write([]byte("AccountStop timeout\n"))
_, _ = os.Stderr.Write(debug.Stack(true))
panic("app.Close AccountStop timeout")
}
}()
s.lock.Lock()
defer s.lock.Unlock()

View file

@ -0,0 +1,33 @@
package chatpush
const ChatsTopicName = "chats"
type Type int
const ChatMessage Type = 1
type Payload struct {
SpaceId string `json:"spaceId,omitempty"`
SenderId string `json:"senderId"`
Type Type `json:"type"`
NewMessagePayload *NewMessagePayload `json:"newMessage,omitempty"`
}
func MakePushPayload(spaceId, accountId, chatId string, messageId string, messageText string) *Payload {
return &Payload{
SpaceId: spaceId,
SenderId: accountId,
Type: ChatMessage,
NewMessagePayload: &NewMessagePayload{
ChatId: chatId,
MsgId: messageId,
Text: messageText,
},
}
}
type NewMessagePayload struct {
ChatId string `json:"chatId"`
MsgId string `json:"msgId"`
Text string `json:"text"`
}

View file

@ -1,41 +0,0 @@
package push
import (
"github.com/anyproto/anytype-heart/core/block/editor/chatobject"
)
const ChatsTopicName = "chats"
type Type int
const ChatMessage Type = 1
type Payload struct {
SpaceId string `json:"spaceId,omitempty"`
SenderId string `json:"senderId"`
Type Type `json:"type"`
NewMessagePayload *NewMessagePayload `json:"newMessage,omitempty"`
}
func MakePushPayload(spaceId, accountId, chatId string, message *chatobject.Message) *Payload {
return &Payload{
SpaceId: spaceId,
SenderId: accountId,
Type: ChatMessage,
NewMessagePayload: makeNewMessagePayload(chatId, message),
}
}
type NewMessagePayload struct {
ChatId string `json:"chatId"`
MsgId string `json:"msgId"`
Text string `json:"text"`
}
func makeNewMessagePayload(chatId string, message *chatobject.Message) *NewMessagePayload {
return &NewMessagePayload{
ChatId: chatId,
MsgId: message.Id,
Text: message.Message.Text,
}
}

View file

@ -13,10 +13,9 @@ import (
"go.uber.org/zap"
"github.com/anyproto/anytype-heart/core/block/cache"
"github.com/anyproto/anytype-heart/core/block/chats/push"
"github.com/anyproto/anytype-heart/core/block/chats/chatpush"
"github.com/anyproto/anytype-heart/core/block/editor/chatobject"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/event"
"github.com/anyproto/anytype-heart/core/session"
subscriptionservice "github.com/anyproto/anytype-heart/core/subscription"
"github.com/anyproto/anytype-heart/core/subscription/crossspacesub"
@ -62,22 +61,23 @@ type accountService interface {
type service struct {
objectGetter cache.ObjectGetter
crossSpaceSubService crossspacesub.Service
pushService pushService
accountService accountService
componentCtx context.Context
componentCtxCancel context.CancelFunc
eventSender event.Sender
lock sync.Mutex
chatObjectsSubQueue *mb.MB[*pb.EventMessage]
chatObjectIds map[string]struct{}
pushService pushService
accountService accountService
lock sync.Mutex
isMessagePreviewSubActive bool
chatObjectIds map[string]struct{}
}
func New() Service {
return &service{
chatObjectIds: map[string]struct{}{},
chatObjectIds: map[string]struct{}{},
chatObjectsSubQueue: mb.New[*pb.EventMessage](0),
}
}
@ -88,7 +88,6 @@ func (s *service) Name() string {
func (s *service) Init(a *app.App) error {
s.objectGetter = app.MustComponent[cache.ObjectGetter](a)
s.crossSpaceSubService = app.MustComponent[crossspacesub.Service](a)
s.eventSender = app.MustComponent[event.Sender](a)
s.componentCtx, s.componentCtxCancel = context.WithCancel(context.Background())
s.pushService = app.MustComponent[pushService](a)
s.accountService = app.MustComponent[accountService](a)
@ -101,18 +100,15 @@ const (
func (s *service) SubscribeToMessagePreviews(ctx context.Context) (string, error) {
s.lock.Lock()
if s.chatObjectsSubQueue != nil {
s.lock.Unlock()
defer s.lock.Unlock()
err := s.UnsubscribeFromMessagePreviews()
if s.isMessagePreviewSubActive {
err := s.unsubscribeFromMessagePreviews()
if err != nil {
return "", fmt.Errorf("stop previous subscription: %w", err)
}
s.lock.Lock()
}
s.chatObjectsSubQueue = mb.New[*pb.EventMessage](0)
s.lock.Unlock()
s.isMessagePreviewSubActive = true
resp, err := s.crossSpaceSubService.Subscribe(subscriptionservice.SubscribeRequest{
SubId: allChatsSubscriptionId,
@ -136,28 +132,28 @@ func (s *service) SubscribeToMessagePreviews(ctx context.Context) (string, error
log.Error("init lastMessage subscription", zap.Error(err))
}
}
go s.monitorChats()
return chatobject.LastMessageSubscriptionId, nil
}
func (s *service) UnsubscribeFromMessagePreviews() error {
s.lock.Lock()
defer s.lock.Unlock()
return s.unsubscribeFromMessagePreviews()
}
func (s *service) unsubscribeFromMessagePreviews() error {
err := s.crossSpaceSubService.Unsubscribe(allChatsSubscriptionId)
if err != nil {
return fmt.Errorf("unsubscribe from cross-space sub: %w", err)
}
s.lock.Lock()
err = s.chatObjectsSubQueue.Close()
if err != nil {
log.Error("close cross-space chat objects queue", zap.Error(err))
}
s.chatObjectsSubQueue = nil
s.isMessagePreviewSubActive = false
chatIds := lo.Keys(s.chatObjectIds)
for key := range s.chatObjectIds {
delete(s.chatObjectIds, key)
}
s.lock.Unlock()
for _, chatId := range chatIds {
err := s.Unsubscribe(chatId, chatobject.LastMessageSubscriptionId)
@ -169,18 +165,31 @@ func (s *service) UnsubscribeFromMessagePreviews() error {
}
func (s *service) Run(ctx context.Context) error {
go s.monitorMessagePreviews()
return nil
}
func (s *service) monitorChats() {
func (s *service) monitorMessagePreviews() {
matcher := subscriptionservice.EventMatcher{
OnAdd: func(add *pb.EventObjectSubscriptionAdd) {
s.lock.Lock()
defer s.lock.Unlock()
if !s.isMessagePreviewSubActive {
return
}
err := s.onChatAdded(add.Id)
if err != nil {
log.Error("init last message subscription", zap.Error(err))
}
},
OnRemove: func(remove *pb.EventObjectSubscriptionRemove) {
s.lock.Lock()
defer s.lock.Unlock()
if !s.isMessagePreviewSubActive {
return
}
err := s.onChatRemoved(remove.Id)
if err != nil {
log.Error("unsubscribe from the last message", zap.Error(err))
@ -201,13 +210,11 @@ func (s *service) monitorChats() {
}
func (s *service) onChatAdded(chatObjectId string) error {
s.lock.Lock()
if _, ok := s.chatObjectIds[chatObjectId]; ok {
s.lock.Unlock()
return nil
}
s.chatObjectIds[chatObjectId] = struct{}{}
s.lock.Unlock()
return cache.Do(s.objectGetter, chatObjectId, func(sb chatobject.StoreObject) error {
var err error
_, err = sb.SubscribeLastMessages(s.componentCtx, chatobject.LastMessageSubscriptionId, 1, true)
@ -219,9 +226,7 @@ func (s *service) onChatAdded(chatObjectId string) error {
}
func (s *service) onChatRemoved(chatObjectId string) error {
s.lock.Lock()
delete(s.chatObjectIds, chatObjectId)
s.lock.Unlock()
err := s.Unsubscribe(chatObjectId, chatobject.LastMessageSubscriptionId)
if err != nil && !errors.Is(err, domain.ErrObjectNotFound) {
@ -235,9 +240,7 @@ func (s *service) Close(ctx context.Context) error {
s.lock.Lock()
defer s.lock.Unlock()
if s.chatObjectsSubQueue != nil {
err = s.chatObjectsSubQueue.Close()
}
err = s.chatObjectsSubQueue.Close()
s.componentCtxCancel()
@ -256,17 +259,19 @@ func (s *service) AddMessage(ctx context.Context, sessionCtx session.Context, ch
spaceId = sb.SpaceID()
return err
})
go s.sendPushNotification(spaceId, chatObjectId, message)
if err == nil {
go s.sendPushNotification(spaceId, chatObjectId, messageId, message.Message.Text)
}
return messageId, err
}
func (s *service) sendPushNotification(spaceId, chatObjectId string, message *chatobject.Message) {
payload := push.MakePushPayload(spaceId, s.accountService.AccountID(), chatObjectId, message)
func (s *service) sendPushNotification(spaceId, chatObjectId string, messageId string, messageText string) {
payload := chatpush.MakePushPayload(spaceId, s.accountService.AccountID(), chatObjectId, messageId, messageText)
jsonPayload, err := json.Marshal(payload)
if err != nil {
log.Error("marshal push payload", zap.Error(err))
}
err = s.pushService.Notify(context.Background(), spaceId, []string{push.ChatsTopicName}, jsonPayload)
err = s.pushService.Notify(s.componentCtx, spaceId, []string{chatpush.ChatsTopicName}, jsonPayload)
if err != nil {
log.Error("notify push message", zap.Error(err))
}

View file

@ -107,13 +107,14 @@ func (s *Service) Sort(ctx session.Context, req *pb.RpcObjectCollectionSortReque
}
func (s *Service) updateCollection(ctx session.Context, contextID string, modifier func(src []string) []string) error {
return cache.DoStateCtx(s.picker, ctx, contextID, func(s *state.State, sb smartblock.SmartBlock) error {
lst := s.GetStoreSlice(template.CollectionStoreKey)
return cache.DoStateCtx(s.picker, ctx, contextID, func(st *state.State, sb smartblock.SmartBlock) error {
s.collectionAddHookOnce(sb)
lst := st.GetStoreSlice(template.CollectionStoreKey)
lst = modifier(lst)
s.UpdateStoreSlice(template.CollectionStoreKey, lst)
st.UpdateStoreSlice(template.CollectionStoreKey, lst)
// TODO why we're adding empty list of flags?
flags := internalflag.Set{}
flags.AddToState(s)
flags.AddToState(st)
return nil
}, smartblock.KeepInternalFlags)
}

View file

@ -14,7 +14,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/cache"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/block/source/sourceimpl"
"github.com/anyproto/anytype-heart/pkg/lib/database"
"github.com/anyproto/anytype-heart/tests/blockbuilder"
"github.com/anyproto/anytype-heart/util/debug"
@ -102,7 +102,7 @@ func (s *Service) debugTree(req *http.Request) (debugTree, error) {
}
err := cache.Do(s, id, func(sb smartblock.SmartBlock) error {
ot := sb.Tree()
return ot.IterateRoot(source.UnmarshalChange, func(change *objecttree.Change) bool {
return ot.IterateRoot(sourceimpl.UnmarshalChange, func(change *objecttree.Change) bool {
change.Next = nil
raw, err := json.Marshal(change)
if err != nil {
@ -140,7 +140,7 @@ func (s *Service) debugTreeInSpace(req *http.Request) (debugTree, error) {
err = spc.Do(id, func(sb smartblock.SmartBlock) error {
ot := sb.Tree()
return ot.IterateRoot(source.UnmarshalChange, func(change *objecttree.Change) bool {
return ot.IterateRoot(sourceimpl.UnmarshalChange, func(change *objecttree.Change) bool {
change.Next = nil
raw, err := json.Marshal(change)
if err != nil {

View file

@ -657,12 +657,34 @@ func (s *Service) TableColumnListFill(ctx session.Context, req pb.RpcBlockTableC
return err
}
func (s *Service) CreateWidgetBlock(ctx session.Context, req *pb.RpcBlockCreateWidgetRequest) (string, error) {
func (s *Service) CreateWidgetBlock(ctx session.Context, req *pb.RpcBlockCreateWidgetRequest, checkDuplicatedTarget bool) (string, error) {
var id string
err := cache.DoStateCtx(s, ctx, req.ContextId, func(st *state.State, w widget.Widget) error {
if checkDuplicatedTarget && widgetHasBlock(st, req.Block) {
return nil
}
var err error
id, err = w.CreateBlock(st, req)
return err
})
return id, err
}
// widgetHasBlock checks if widget has block with same targetBlockId as in provided block
func widgetHasBlock(st *state.State, block *model.Block) (found bool) {
if block == nil || block.GetLink() == nil {
return false
}
targetBlockId := block.GetLink().TargetBlockId
// nolint:errcheck
_ = st.Iterate(func(b simple.Block) (isContinue bool) {
if l := b.Model().GetLink(); l != nil {
if l.GetTargetBlockId() == targetBlockId {
found = true
return false
}
}
return true
})
return found
}

View file

@ -12,7 +12,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/editor/storestate"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/block/source/sourceimpl"
)
type DebugChange struct {
@ -51,10 +51,11 @@ func (s *debugComponent) DebugChanges(ctx context.Context) ([]*DebugChange, erro
if err != nil {
return nil, fmt.Errorf("new tx: %w", err)
}
defer tx.Commit()
// todo: replace with readonly tx
defer tx.Rollback()
var result []*DebugChange
err = historyTree.IterateFrom(historyTree.Root().Id, source.UnmarshalStoreChange, func(change *objecttree.Change) bool {
err = historyTree.IterateFrom(historyTree.Root().Id, sourceimpl.UnmarshalStoreChange, func(change *objecttree.Change) bool {
orderId, err := tx.GetOrder(change.Id)
if err != nil {
result = append(result, &DebugChange{

View file

@ -18,7 +18,7 @@ import (
type ChatHandler struct {
repository *repository
subscription *subscription
subscription *subscriptionManager
currentIdentity string
myParticipantId string
// forceNotRead forces handler to mark all messages as not read. It's useful for unit testing

View file

@ -2,6 +2,7 @@ package chatobject
import (
"context"
"errors"
"fmt"
"time"
@ -77,7 +78,7 @@ type storeObject struct {
storeSource source.Store
store *storestate.StoreState
eventSender event.Sender
subscription *subscription
subscription *subscriptionManager
crdtDb anystore.DB
spaceIndex spaceindex.Store
chatHandler *ChatHandler
@ -109,7 +110,14 @@ func (s *storeObject) Init(ctx *smartblock.InitContext) error {
return fmt.Errorf("source is not a store")
}
collection, err := s.crdtDb.Collection(ctx.Ctx, storeSource.Id()+CollectionName)
collectionName := storeSource.Id() + CollectionName
collection, err := s.crdtDb.OpenCollection(ctx.Ctx, collectionName)
if errors.Is(err, anystore.ErrCollectionNotFound) {
collection, err = s.crdtDb.CreateCollection(ctx.Ctx, collectionName)
if err != nil {
return fmt.Errorf("create collection: %w", err)
}
}
if err != nil {
return fmt.Errorf("get collection: %w", err)
}
@ -120,7 +128,7 @@ func (s *storeObject) Init(ctx *smartblock.InitContext) error {
}
// Use Object and Space IDs from source, because object is not initialized yet
myParticipantId := domain.NewParticipantId(ctx.Source.SpaceID(), s.accountService.AccountID())
s.subscription = s.newSubscription(
s.subscription = s.newSubscriptionManager(
domain.FullID{ObjectID: ctx.Source.Id(), SpaceID: ctx.Source.SpaceID()},
s.accountService.AccountID(),
myParticipantId,
@ -220,7 +228,10 @@ func (s *storeObject) AddMessage(ctx context.Context, sessionCtx session.Context
arena.Reset()
s.arenaPool.Put(arena)
}()
message.Read = true
// Normalize message
message.Read = false
message.MentionRead = false
obj := arena.NewObject()
message.MarshalAnyenc(obj, arena)
@ -240,6 +251,18 @@ func (s *storeObject) AddMessage(ctx context.Context, sessionCtx session.Context
if err != nil {
return "", fmt.Errorf("push change: %w", err)
}
if !s.chatHandler.forceNotRead {
for _, counterType := range []CounterType{CounterTypeMessage, CounterTypeMention} {
handler := newReadHandler(counterType, s.subscription)
err = s.storeSource.MarkSeenHeads(ctx, handler.getDiffManagerName(), []string{messageId})
if err != nil {
return "", fmt.Errorf("mark read: %w", err)
}
}
}
return messageId, nil
}

View file

@ -326,7 +326,7 @@ func TestEditMessage(t *testing.T) {
require.NoError(t, err)
require.Len(t, messagesResp.Messages, 1)
want := inputMessage
want := givenComplexMessage()
want.Id = messageId
want.Creator = testCreator

View file

@ -29,7 +29,7 @@ type readHandler interface {
}
type readMessagesHandler struct {
subscription *subscription
subscription *subscriptionManager
}
func (h *readMessagesHandler) getUnreadFilter() query.Filter {
@ -79,7 +79,7 @@ func (h *readMessagesHandler) readModifier(value bool) query.Modifier {
}
type readMentionsHandler struct {
subscription *subscription
subscription *subscriptionManager
}
func (h *readMentionsHandler) getUnreadFilter() query.Filter {
@ -131,7 +131,7 @@ func (h *readMentionsHandler) readModifier(value bool) query.Modifier {
})
}
func newReadHandler(counterType CounterType, subscription *subscription) readHandler {
func newReadHandler(counterType CounterType, subscription *subscriptionManager) readHandler {
switch counterType {
case CounterTypeMessage:
return &readMessagesHandler{subscription: subscription}
@ -163,8 +163,12 @@ func (s *storeObject) MarkMessagesAsUnread(ctx context.Context, afterOrderId str
if err != nil {
return fmt.Errorf("create tx: %w", err)
}
defer txn.Rollback()
var commited bool
defer func() {
if !commited {
_ = txn.Rollback()
}
}()
handler := newReadHandler(counterType, s.subscription)
messageIds, err := s.repository.getReadMessagesAfter(txn.Context(), afterOrderId, handler)
if err != nil {
@ -206,6 +210,7 @@ func (s *storeObject) MarkMessagesAsUnread(ctx context.Context, afterOrderId str
return fmt.Errorf("store seen heads: %w", err)
}
commited = true
return txn.Commit()
}
@ -218,7 +223,12 @@ func (s *storeObject) markReadMessages(changeIds []string, handler readHandler)
if err != nil {
return fmt.Errorf("start write tx: %w", err)
}
defer txn.Rollback()
var commited bool
defer func() {
if !commited {
txn.Rollback()
}
}()
idsModified := s.repository.setReadFlag(txn.Context(), s.Id(), changeIds, handler, true)
@ -228,6 +238,7 @@ func (s *storeObject) markReadMessages(changeIds []string, handler readHandler)
return fmt.Errorf("get oldest order id: %w", err)
}
commited = true
err = txn.Commit()
if err != nil {
return fmt.Errorf("commit: %w", err)

View file

@ -322,7 +322,7 @@ func (s *repository) getMessagesByIds(ctx context.Context, messageIds []string)
}
messages = append(messages, msg)
}
return messages, txn.Commit()
return messages, nil
}
func (s *repository) getLastMessages(ctx context.Context, limit uint) ([]*Message, error) {

View file

@ -19,7 +19,7 @@ import (
const LastMessageSubscriptionId = "lastMessage"
type subscription struct {
type subscriptionManager struct {
componentCtx context.Context
spaceId string
@ -43,8 +43,8 @@ type subscription struct {
repository *repository
}
func (s *storeObject) newSubscription(fullId domain.FullID, myIdentity string, myParticipantId string) *subscription {
return &subscription{
func (s *storeObject) newSubscriptionManager(fullId domain.FullID, myIdentity string, myParticipantId string) *subscriptionManager {
return &subscriptionManager{
componentCtx: s.componentCtx,
spaceId: fullId.SpaceID,
chatId: fullId.ObjectID,
@ -57,8 +57,8 @@ func (s *storeObject) newSubscription(fullId domain.FullID, myIdentity string, m
}
}
// subscribe subscribes to messages. It returns true if there was no subscription with provided id
func (s *subscription) subscribe(subId string) bool {
// subscribe subscribes to messages. It returns true if there was no subscriptionManager with provided id
func (s *subscriptionManager) subscribe(subId string) bool {
if !slices.Contains(s.ids, subId) {
s.ids = append(s.ids, subId)
s.chatStateUpdated = false
@ -67,24 +67,24 @@ func (s *subscription) subscribe(subId string) bool {
return false
}
func (s *subscription) unsubscribe(subId string) {
func (s *subscriptionManager) unsubscribe(subId string) {
s.ids = slice.Remove(s.ids, subId)
}
func (s *subscription) isActive() bool {
func (s *subscriptionManager) isActive() bool {
return len(s.ids) > 0
}
func (s *subscription) withDeps() bool {
func (s *subscriptionManager) withDeps() bool {
return slices.Equal(s.ids, []string{LastMessageSubscriptionId})
}
// setSessionContext sets the session context for the current operation
func (s *subscription) setSessionContext(ctx session.Context) {
func (s *subscriptionManager) setSessionContext(ctx session.Context) {
s.sessionContext = ctx
}
func (s *subscription) loadChatState(ctx context.Context) error {
func (s *subscriptionManager) loadChatState(ctx context.Context) error {
state, err := s.repository.loadChatState(ctx)
if err != nil {
return err
@ -93,17 +93,17 @@ func (s *subscription) loadChatState(ctx context.Context) error {
return nil
}
func (s *subscription) getChatState() *model.ChatState {
func (s *subscriptionManager) getChatState() *model.ChatState {
return copyChatState(s.chatState)
}
func (s *subscription) updateChatState(updater func(*model.ChatState) *model.ChatState) {
func (s *subscriptionManager) updateChatState(updater func(*model.ChatState) *model.ChatState) {
s.chatState = updater(s.chatState)
s.chatStateUpdated = true
}
// flush is called after commiting changes
func (s *subscription) flush() {
func (s *subscriptionManager) flush() {
if !s.canSend() {
return
}
@ -136,7 +136,6 @@ func (s *subscription) flush() {
ContextId: s.chatId,
Messages: events,
}
if s.sessionContext != nil {
s.sessionContext.SetMessages(s.chatId, events)
s.eventSender.BroadcastToOtherSessions(s.sessionContext.ID(), ev)
@ -146,7 +145,7 @@ func (s *subscription) flush() {
}
}
func (s *subscription) getIdentityDetails(identity string) (*domain.Details, error) {
func (s *subscriptionManager) getIdentityDetails(identity string) (*domain.Details, error) {
cached, ok := s.identityCache.Get(identity)
if ok {
return cached, nil
@ -159,7 +158,7 @@ func (s *subscription) getIdentityDetails(identity string) (*domain.Details, err
return details, nil
}
func (s *subscription) add(prevOrderId string, message *Message) {
func (s *subscriptionManager) add(prevOrderId string, message *Message) {
if !s.canSend() {
return
}
@ -194,7 +193,7 @@ func (s *subscription) add(prevOrderId string, message *Message) {
}))
}
func (s *subscription) delete(messageId string) {
func (s *subscriptionManager) delete(messageId string) {
ev := &pb.EventChatDelete{
Id: messageId,
SubIds: slices.Clone(s.ids),
@ -207,7 +206,7 @@ func (s *subscription) delete(messageId string) {
s.needReloadState = true
}
func (s *subscription) updateFull(message *Message) {
func (s *subscriptionManager) updateFull(message *Message) {
if !s.canSend() {
return
}
@ -221,7 +220,7 @@ func (s *subscription) updateFull(message *Message) {
}))
}
func (s *subscription) updateReactions(message *Message) {
func (s *subscriptionManager) updateReactions(message *Message) {
if !s.canSend() {
return
}
@ -237,7 +236,7 @@ func (s *subscription) updateReactions(message *Message) {
// updateMessageRead updates the read status of the messages with the given ids
// read ids should ONLY contain ids if they were actually modified in the DB
func (s *subscription) updateMessageRead(ids []string, read bool) {
func (s *subscriptionManager) updateMessageRead(ids []string, read bool) {
s.updateChatState(func(state *model.ChatState) *model.ChatState {
if read {
state.Messages.Counter -= int32(len(ids))
@ -259,7 +258,7 @@ func (s *subscription) updateMessageRead(ids []string, read bool) {
}))
}
func (s *subscription) updateMentionRead(ids []string, read bool) {
func (s *subscriptionManager) updateMentionRead(ids []string, read bool) {
s.updateChatState(func(state *model.ChatState) *model.ChatState {
if read {
state.Mentions.Counter -= int32(len(ids))
@ -281,7 +280,7 @@ func (s *subscription) updateMentionRead(ids []string, read bool) {
}))
}
func (s *subscription) canSend() bool {
func (s *subscriptionManager) canSend() bool {
if s.sessionContext != nil {
return true
}

View file

@ -362,9 +362,6 @@ func (sb *smartBlock) Init(ctx *InitContext) (err error) {
}
}
ctx.State.AddBundledRelationLinks(relKeys...)
if ctx.IsNewObject && ctx.State != nil {
source.NewSubObjectsAndProfileLinksMigration(sb.Type(), sb.space, sb.currentParticipantId, sb.spaceIndex).Migrate(ctx.State)
}
if err = sb.injectLocalDetails(ctx.State); err != nil {
return
@ -840,7 +837,6 @@ func (sb *smartBlock) Apply(s *state.State, flags ...ApplyFlag) (err error) {
}
func (sb *smartBlock) ResetToVersion(s *state.State) (err error) {
source.NewSubObjectsAndProfileLinksMigration(sb.Type(), sb.space, sb.currentParticipantId, sb.spaceIndex).Migrate(s)
s.SetParent(sb.Doc.(*state.State))
sb.storeFileKeys(s)
sb.injectLocalDetails(s)
@ -972,9 +968,13 @@ func (sb *smartBlock) StateAppend(f func(d state.Doc) (s *state.State, changes [
sb.CheckSubscriptions()
}
sb.runIndexer(s)
var parentDetails *domain.Details
if s.ParentState() != nil {
parentDetails = s.ParentState().Details()
}
if err = sb.execHooks(HookAfterApply, ApplyInfo{
State: s,
ParentDetails: s.ParentState().Details(),
ParentDetails: parentDetails,
Events: msgs,
Changes: changes,
}); err != nil {

View file

@ -126,6 +126,7 @@ func (ss *StoreState) NewTx(ctx context.Context) (*StoreStateTx, error) {
}
stx := &StoreStateTx{state: ss, tx: tx, ctx: tx.Context(), arena: &anyenc.Arena{}}
if err = stx.init(); err != nil {
_ = tx.Rollback()
return nil, err
}
return stx, nil

View file

@ -203,7 +203,7 @@ func (i *Import) addRootCollectionWidget(res *ImportResponse, req *ImportRequest
TargetBlockId: res.RootCollectionId,
}},
},
})
}, true)
if err != nil {
log.Errorf("failed to create widget from root collection, error: %s", err.Error())
}

View file

@ -10,7 +10,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/restriction"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/block/source/sourceimpl"
"github.com/anyproto/anytype-heart/core/block/template"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -219,7 +219,7 @@ func buildDateObject(space clientspace.Space, details *domain.Details) (string,
return "", nil, fmt.Errorf("failed to find Date type to build Date object: %w", err)
}
dateSource := source.NewDate(source.DateSourceParams{
dateSource := sourceimpl.NewDate(sourceimpl.DateSourceParams{
Id: domain.FullID{
ObjectID: dateObject.Id(),
SpaceID: space.Id(),
@ -227,7 +227,7 @@ func buildDateObject(space clientspace.Space, details *domain.Details) (string,
DateObjectTypeId: typeId,
})
detailsGetter, ok := dateSource.(source.SourceIdEndodedDetails)
detailsGetter, ok := dateSource.(sourceimpl.SourceIdEndodedDetails)
if !ok {
return "", nil, fmt.Errorf("date object does not implement DetailsFromId")
}

View file

@ -228,8 +228,12 @@ func (t *treeSyncer) requestTree(p peer.Peer, id string) {
} else {
log.Debug("loaded missing tree")
}
tr.Lock()
if objecttree.IsEmptyDerivedTree(tr) {
tr.Unlock()
t.pingTree(p, tr)
} else {
tr.Unlock()
}
}

View file

@ -0,0 +1,146 @@
package source
import (
"context"
"errors"
"fmt"
"time"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/editor/storestate"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/space/spacecore/typeprovider"
)
var (
ErrReadOnly = errors.New("object is read only")
ErrBigChangeSize = errors.New("change size is above the limit")
ErrUnknownDataFormat = fmt.Errorf("unknown data format: you may need to upgrade anytype in order to open this page")
)
type PushChangeHook func(params PushChangeParams) (id string, err error)
type PushStoreChangeParams struct {
State *storestate.StoreState
Changes []*pb.StoreChangeContent
Time time.Time // used to derive the lastModifiedDate; Default is time.Now()
}
type ObjectTreeProvider interface {
Tree() objecttree.ObjectTree
}
type Space interface {
Id() string
TreeBuilder() objecttreebuilder.TreeBuilder
GetRelationIdByKey(ctx context.Context, key domain.RelationKey) (id string, err error)
GetTypeIdByKey(ctx context.Context, key domain.TypeKey) (id string, err error)
DeriveObjectID(ctx context.Context, uniqueKey domain.UniqueKey) (id string, err error)
StoredIds() []string
IsPersonal() bool
}
type Service interface {
NewSource(ctx context.Context, space Space, id string, buildOptions BuildOptions) (source Source, err error)
RegisterStaticSource(s Source) error
NewStaticSource(params StaticSourceParams) SourceWithType
DetailsFromIdBasedSource(id domain.FullID) (*domain.Details, error)
IDsListerBySmartblockType(space Space, blockType smartblock.SmartBlockType) (IDsLister, error)
app.Component
}
type Store interface {
Source
ReadStoreDoc(ctx context.Context, stateStore *storestate.StoreState, onUpdateHook func()) (err error)
PushStoreChange(ctx context.Context, params PushStoreChangeParams) (changeId string, err error)
SetPushChangeHook(onPushChange PushChangeHook)
// RegisterDiffManager sets a hook that will be called when a change is removed (marked as read) from the diff manager
// must be called before ReadStoreDoc.
//
// If a head is marked as read in the diff manager, all earlier heads for that branch marked as read as well
RegisterDiffManager(name string, onRemoveHook func(removed []string))
// MarkSeenHeads marks heads as seen in a diff manager. Then the diff manager will call a hook from SetDiffManagerOnRemoveHook
MarkSeenHeads(ctx context.Context, name string, heads []string) error
// StoreSeenHeads persists current seen heads in any-store
StoreSeenHeads(ctx context.Context, name string) error
// InitDiffManager initializes a diff manager with specified seen heads
InitDiffManager(ctx context.Context, name string, seenHeads []string) error
}
type PushChangeParams struct {
State *state.State
Changes []*pb.ChangeContent
FileChangedHashes []string
Time time.Time // used to derive the lastModifiedDate; Default is time.Now()
DoSnapshot bool
}
type IDsLister interface {
ListIds() ([]string, error)
}
type Source interface {
Id() string
SpaceID() string
Type() smartblock.SmartBlockType
Heads() []string
GetFileKeysSnapshot() []*pb.ChangeFileKeys
ReadOnly() bool
ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error)
PushChange(params PushChangeParams) (id string, err error)
Close() (err error)
GetCreationInfo() (creatorObjectId string, createdDate int64, err error)
}
type ChangeReceiver interface {
StateAppend(func(d state.Doc) (s *state.State, changes []*pb.ChangeContent, err error)) error
StateRebuild(d state.Doc) (err error)
}
type SourceWithType interface {
Source
IDsLister
}
type BuildOptions struct {
DisableRemoteLoad bool
Listener updatelistener.UpdateListener
}
func (b *BuildOptions) BuildTreeOpts() objecttreebuilder.BuildTreeOpts {
return objecttreebuilder.BuildTreeOpts{
Listener: b.Listener,
TreeBuilder: func(treeStorage objecttree.Storage, aclList list.AclList) (objecttree.ObjectTree, error) {
ot, err := objecttree.BuildKeyFilterableObjectTree(treeStorage, aclList)
if err != nil {
return nil, err
}
sbt, _, err := typeprovider.GetTypeAndKeyFromRoot(ot.Header())
if err != nil {
return nil, err
}
if sbt == smartblock.SmartBlockTypeChatDerivedObject || sbt == smartblock.SmartBlockTypeAccountObject {
ot.SetFlusher(objecttree.MarkNewChangeFlusher())
}
return ot, nil
},
TreeValidator: objecttree.ValidateFilterRawTree,
}
}
type StaticSourceParams struct {
Id domain.FullID
SbType smartblock.SmartBlockType
State *state.State
CreatorId string
}

View file

@ -1,9 +1,10 @@
package source
package sourceimpl
import (
"context"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -13,7 +14,7 @@ import (
)
// TODO Is it used?
func NewAnytypeProfile(id string) (s Source) {
func NewAnytypeProfile(id string) (s source.Source) {
return &anytypeProfile{
id: id,
}
@ -57,7 +58,7 @@ func (v *anytypeProfile) getDetails() (p *domain.Details) {
return det
}
func (v *anytypeProfile) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) {
func (v *anytypeProfile) ReadDoc(ctx context.Context, receiver source.ChangeReceiver, empty bool) (doc state.Doc, err error) {
s := state.NewDoc(v.id, nil).(*state.State)
d := v.getDetails()
@ -81,7 +82,7 @@ func (s *anytypeProfile) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}
func (s *anytypeProfile) PushChange(params PushChangeParams) (id string, err error) {
func (s *anytypeProfile) PushChange(params source.PushChangeParams) (id string, err error) {
return
}

View file

@ -1,10 +1,11 @@
package source
package sourceimpl
import (
"context"
"strings"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/relationutils"
"github.com/anyproto/anytype-heart/pb"
@ -14,7 +15,7 @@ import (
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
)
func NewBundledObjectType(id string) (s Source) {
func NewBundledObjectType(id string) (s source.Source) {
return &bundledObjectType{
id: id,
objectTypeKey: domain.TypeKey(strings.TrimPrefix(id, addr.BundledObjectTypeURLPrefix)),
@ -62,7 +63,7 @@ func getDetailsForBundledObjectType(id string) (extraRels []*model.RelationLink,
return extraRels, (&relationutils.ObjectType{ot}).BundledTypeDetails(), nil
}
func (v *bundledObjectType) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) {
func (v *bundledObjectType) ReadDoc(ctx context.Context, receiver source.ChangeReceiver, empty bool) (doc state.Doc, err error) {
// we use STType instead of BundledObjectType for a reason we want to have the same prefix
// ideally the whole logic should be done on the level of spaceService to return the virtual space for marketplace
uk, err := domain.NewUniqueKey(smartblock.SmartBlockTypeObjectType, v.objectTypeKey.String())
@ -85,7 +86,7 @@ func (v *bundledObjectType) ReadDoc(ctx context.Context, receiver ChangeReceiver
return s, nil
}
func (v *bundledObjectType) PushChange(params PushChangeParams) (id string, err error) {
func (v *bundledObjectType) PushChange(params source.PushChangeParams) (id string, err error) {
return "", nil
}

View file

@ -1,4 +1,4 @@
package source
package sourceimpl
import (
"context"
@ -6,6 +6,7 @@ import (
"strings"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/relationutils"
"github.com/anyproto/anytype-heart/pb"
@ -15,7 +16,7 @@ import (
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
)
func NewBundledRelation(id string) (s Source) {
func NewBundledRelation(id string) (s source.Source) {
return &bundledRelation{
id: id,
relKey: domain.RelationKey(strings.TrimPrefix(id, addr.BundledRelationURLPrefix)),
@ -64,7 +65,7 @@ func (v *bundledRelation) getDetails(id string) (p *domain.Details, err error) {
return details, nil
}
func (v *bundledRelation) ReadDoc(_ context.Context, _ ChangeReceiver, empty bool) (doc state.Doc, err error) {
func (v *bundledRelation) ReadDoc(_ context.Context, _ source.ChangeReceiver, empty bool) (doc state.Doc, err error) {
// we use STRelation instead of BundledRelation for a reason we want to have the same prefix
// ideally the whole logic should be done on the level of spaceService to return the virtual space for marketplace
uk, err := domain.NewUniqueKey(smartblock.SmartBlockTypeRelation, v.relKey.String())
@ -86,12 +87,12 @@ func (v *bundledRelation) ReadDoc(_ context.Context, _ ChangeReceiver, empty boo
return s, nil
}
func (v *bundledRelation) PushChange(params PushChangeParams) (id string, err error) {
func (v *bundledRelation) PushChange(params source.PushChangeParams) (id string, err error) {
if params.State.ChangeId() == "" {
// allow the first changes created by Init
return "virtual", nil
}
return "", ErrReadOnly
return "", source.ErrReadOnly
}
func (v *bundledRelation) ListIds() ([]string, error) {

View file

@ -1,4 +1,4 @@
package source
package sourceimpl
import (
"context"
@ -7,6 +7,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/editor/template"
"github.com/anyproto/anytype-heart/core/block/restriction"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -21,7 +22,7 @@ type DateSourceParams struct {
DateObjectTypeId string
}
func NewDate(params DateSourceParams) (s Source) {
func NewDate(params DateSourceParams) (s source.Source) {
return &date{
id: params.Id.ObjectID,
spaceId: params.Id.SpaceID,
@ -80,7 +81,7 @@ func (d *date) DetailsFromId() (*domain.Details, error) {
return d.getDetails()
}
func (d *date) ReadDoc(context.Context, ChangeReceiver, bool) (doc state.Doc, err error) {
func (d *date) ReadDoc(context.Context, source.ChangeReceiver, bool) (doc state.Doc, err error) {
details, err := d.getDetails()
if err != nil {
return
@ -96,7 +97,7 @@ func (d *date) ReadDoc(context.Context, ChangeReceiver, bool) (doc state.Doc, er
return s, nil
}
func (d *date) PushChange(PushChangeParams) (id string, err error) {
func (d *date) PushChange(source.PushChangeParams) (id string, err error) {
return "", nil
}

View file

@ -1,4 +1,4 @@
package source
package sourceimpl
import (
"math/rand"

View file

@ -1,9 +1,10 @@
package source
package sourceimpl
import (
"context"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -14,7 +15,7 @@ import (
type missingObject struct {
}
func NewMissingObject() (s Source) {
func NewMissingObject() (s source.Source) {
return &missingObject{}
}
@ -48,7 +49,7 @@ func (m *missingObject) getDetails() (p *domain.Details) {
return det
}
func (m *missingObject) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) {
func (m *missingObject) ReadDoc(ctx context.Context, receiver source.ChangeReceiver, empty bool) (doc state.Doc, err error) {
s := state.NewDoc(addr.MissingObject, nil).(*state.State)
d := m.getDetails()
@ -70,7 +71,7 @@ func (m *missingObject) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}
func (m *missingObject) PushChange(params PushChangeParams) (id string, err error) {
func (m *missingObject) PushChange(params source.PushChangeParams) (id string, err error) {
return
}

View file

@ -1,4 +1,4 @@
package source
package sourceimpl
import (
"context"
@ -8,27 +8,26 @@ import (
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/object/idderiver"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/files"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/addr"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/space"
"github.com/anyproto/anytype-heart/space/clientspace/keyvalueservice"
"github.com/anyproto/anytype-heart/space/spacecore/storage"
"github.com/anyproto/anytype-heart/space/spacecore/typeprovider"
)
const CName = "source"
func New() Service {
func New() source.Service {
return &service{}
}
@ -38,24 +37,8 @@ type accountService interface {
PersonalSpaceID() string
}
type Space interface {
Id() string
TreeBuilder() objecttreebuilder.TreeBuilder
GetRelationIdByKey(ctx context.Context, key domain.RelationKey) (id string, err error)
GetTypeIdByKey(ctx context.Context, key domain.TypeKey) (id string, err error)
DeriveObjectID(ctx context.Context, uniqueKey domain.UniqueKey) (id string, err error)
StoredIds() []string
IsPersonal() bool
}
type Service interface {
NewSource(ctx context.Context, space Space, id string, buildOptions BuildOptions) (source Source, err error)
RegisterStaticSource(s Source) error
NewStaticSource(params StaticSourceParams) SourceWithType
DetailsFromIdBasedSource(id domain.FullID) (*domain.Details, error)
IDsListerBySmartblockType(space Space, blockType smartblock.SmartBlockType) (IDsLister, error)
app.Component
type TechSpace interface {
KeyValueService() keyvalueservice.Service
}
type service struct {
@ -67,13 +50,14 @@ type service struct {
objectStore objectstore.ObjectStore
fileObjectMigrator fileObjectMigrator
idDeriver idderiver.Deriver
spaceService space.Service
mu sync.Mutex
staticIds map[string]Source
staticIds map[string]source.Source
}
func (s *service) Init(a *app.App) (err error) {
s.staticIds = make(map[string]Source)
s.staticIds = make(map[string]source.Source)
s.sbtProvider = a.MustComponent(typeprovider.CName).(typeprovider.SmartBlockTypeProvider)
s.accountService = app.MustComponent[accountService](a)
@ -81,6 +65,7 @@ func (s *service) Init(a *app.App) (err error) {
s.storageService = a.MustComponent(spacestorage.CName).(storage.ClientStorage)
s.objectStore = app.MustComponent[objectstore.ObjectStore](a)
s.idDeriver = app.MustComponent[idderiver.Deriver](a)
s.spaceService = app.MustComponent[space.Service](a)
s.fileService = app.MustComponent[files.Service](a)
s.fileObjectMigrator = app.MustComponent[fileObjectMigrator](a)
@ -91,33 +76,7 @@ func (s *service) Name() (name string) {
return CName
}
type BuildOptions struct {
DisableRemoteLoad bool
Listener updatelistener.UpdateListener
}
func (b *BuildOptions) BuildTreeOpts() objecttreebuilder.BuildTreeOpts {
return objecttreebuilder.BuildTreeOpts{
Listener: b.Listener,
TreeBuilder: func(treeStorage objecttree.Storage, aclList list.AclList) (objecttree.ObjectTree, error) {
ot, err := objecttree.BuildKeyFilterableObjectTree(treeStorage, aclList)
if err != nil {
return nil, err
}
sbt, _, err := typeprovider.GetTypeAndKeyFromRoot(ot.Header())
if err != nil {
return nil, err
}
if sbt == smartblock.SmartBlockTypeChatDerivedObject || sbt == smartblock.SmartBlockTypeAccountObject {
ot.SetFlusher(objecttree.MarkNewChangeFlusher())
}
return ot, nil
},
TreeValidator: objecttree.ValidateFilterRawTree,
}
}
func (s *service) NewSource(ctx context.Context, space Space, id string, buildOptions BuildOptions) (Source, error) {
func (s *service) NewSource(ctx context.Context, space source.Space, id string, buildOptions source.BuildOptions) (source.Source, error) {
src, err := s.newSource(ctx, space, id, buildOptions)
if err != nil {
return nil, err
@ -129,7 +88,7 @@ func (s *service) NewSource(ctx context.Context, space Space, id string, buildOp
return src, nil
}
func (s *service) newSource(ctx context.Context, space Space, id string, buildOptions BuildOptions) (Source, error) {
func (s *service) newSource(ctx context.Context, space source.Space, id string, buildOptions source.BuildOptions) (source.Source, error) {
if id == addr.AnytypeProfileId {
return NewAnytypeProfile(id), nil
}
@ -166,7 +125,7 @@ func (s *service) newSource(ctx context.Context, space Space, id string, buildOp
participantState := state.NewDoc(id, nil).(*state.State)
// Set object type here in order to derive value of Type relation in smartblock.Init
participantState.SetObjectTypeKey(bundle.TypeKeyParticipant)
params := StaticSourceParams{
params := source.StaticSourceParams{
Id: domain.FullID{
ObjectID: id,
SpaceID: spaceId,
@ -189,7 +148,7 @@ func (s *service) newSource(ctx context.Context, space Space, id string, buildOp
return s.newTreeSource(ctx, space, id, buildOptions.BuildTreeOpts())
}
func (s *service) IDsListerBySmartblockType(space Space, blockType smartblock.SmartBlockType) (IDsLister, error) {
func (s *service) IDsListerBySmartblockType(space source.Space, blockType smartblock.SmartBlockType) (source.IDsLister, error) {
switch blockType {
case smartblock.SmartBlockTypeAnytypeProfile:
return &anytypeProfile{}, nil
@ -200,7 +159,7 @@ func (s *service) IDsListerBySmartblockType(space Space, blockType smartblock.Sm
case smartblock.SmartBlockTypeBundledRelation:
return &bundledRelation{}, nil
case smartblock.SmartBlockTypeBundledTemplate:
params := StaticSourceParams{
params := source.StaticSourceParams{
SbType: smartblock.SmartBlockTypeBundledTemplate,
CreatorId: addr.AnytypeProfileId,
}
@ -209,7 +168,7 @@ func (s *service) IDsListerBySmartblockType(space Space, blockType smartblock.Sm
if err := blockType.Valid(); err != nil {
return nil, err
}
return &source{
return &treeSource{
space: space,
spaceID: space.Id(),
smartblockType: blockType,
@ -241,7 +200,7 @@ func (s *service) DetailsFromIdBasedSource(id domain.FullID) (*domain.Details, e
return nil, fmt.Errorf("date source miss the details")
}
func (s *service) RegisterStaticSource(src Source) error {
func (s *service) RegisterStaticSource(src source.Source) error {
s.mu.Lock()
defer s.mu.Unlock()
s.staticIds[src.Id()] = src

View file

@ -1,9 +1,8 @@
package source
package sourceimpl
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"sync"
@ -22,6 +21,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/editor/template"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/files"
"github.com/anyproto/anytype-heart/metrics"
@ -47,9 +47,6 @@ var (
log = logging.Logger("anytype-mw-source")
bytesPool = sync.Pool{New: func() any { return make([]byte, poolSize) }}
ErrReadOnly = errors.New("object is read only")
ErrBigChangeSize = errors.New("change size is above the limit")
)
func MarshalChange(change *pb.Change) (result []byte, dataType string, err error) {
@ -130,41 +127,12 @@ func UnmarshalChangeWithDataType(dataType string, decrypted []byte) (res any, er
return UnmarshalChange(&objecttree.Change{DataType: dataType}, decrypted)
}
type ChangeReceiver interface {
StateAppend(func(d state.Doc) (s *state.State, changes []*pb.ChangeContent, err error)) error
StateRebuild(d state.Doc) (err error)
}
type Source interface {
Id() string
SpaceID() string
Type() smartblock.SmartBlockType
Heads() []string
GetFileKeysSnapshot() []*pb.ChangeFileKeys
ReadOnly() bool
ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error)
PushChange(params PushChangeParams) (id string, err error)
Close() (err error)
GetCreationInfo() (creatorObjectId string, createdDate int64, err error)
}
type SourceIdEndodedDetails interface {
Id() string
DetailsFromId() (*domain.Details, error)
}
type IDsLister interface {
ListIds() ([]string, error)
}
type SourceWithType interface {
Source
IDsLister
}
var ErrUnknownDataFormat = fmt.Errorf("unknown data format: you may need to upgrade anytype in order to open this page")
func (s *service) newTreeSource(ctx context.Context, space Space, id string, buildOpts objecttreebuilder.BuildTreeOpts) (Source, error) {
func (s *service) newTreeSource(ctx context.Context, space source.Space, id string, buildOpts objecttreebuilder.BuildTreeOpts) (source.Source, error) {
treeBuilder := space.TreeBuilder()
if treeBuilder == nil {
return nil, fmt.Errorf("space doesn't have tree builder")
@ -179,7 +147,7 @@ func (s *service) newTreeSource(ctx context.Context, space Space, id string, bui
return nil, err
}
src := &source{
src := &treeSource{
ObjectTree: ot,
id: id,
space: space,
@ -193,30 +161,26 @@ func (s *service) newTreeSource(ctx context.Context, space Space, id string, bui
fileObjectMigrator: s.fileObjectMigrator,
}
if sbt == smartblock.SmartBlockTypeChatDerivedObject || sbt == smartblock.SmartBlockTypeAccountObject {
return &store{source: src, sbType: sbt, diffManagers: map[string]*diffManager{}}, nil
return &store{treeSource: src, sbType: sbt, diffManagers: map[string]*diffManager{}, spaceService: s.spaceService}, nil
}
return src, nil
}
type ObjectTreeProvider interface {
Tree() objecttree.ObjectTree
}
type fileObjectMigrator interface {
MigrateFiles(st *state.State, spc Space, keysChanges []*pb.ChangeFileKeys)
MigrateFileIdsInDetails(st *state.State, spc Space)
MigrateFiles(st *state.State, spc source.Space, keysChanges []*pb.ChangeFileKeys)
MigrateFileIdsInDetails(st *state.State, spc source.Space)
}
type source struct {
type treeSource struct {
objecttree.ObjectTree
id string
space Space
space source.Space
spaceID string
smartblockType smartblock.SmartBlockType
lastSnapshotId string
changesSinceSnapshot int
receiver ChangeReceiver
receiver source.ChangeReceiver
unsubscribe func()
closed chan struct{}
@ -228,13 +192,13 @@ type source struct {
fileObjectMigrator fileObjectMigrator
}
var _ updatelistener.UpdateListener = (*source)(nil)
var _ updatelistener.UpdateListener = (*treeSource)(nil)
func (s *source) Tree() objecttree.ObjectTree {
func (s *treeSource) Tree() objecttree.ObjectTree {
return s.ObjectTree
}
func (s *source) Update(ot objecttree.ObjectTree) error {
func (s *treeSource) Update(ot objecttree.ObjectTree) error {
// here it should work, because we always have the most common snapshot of the changes in tree
s.lastSnapshotId = ot.Root().Id
prevSnapshot := s.lastSnapshotId
@ -259,7 +223,7 @@ func (s *source) Update(ot objecttree.ObjectTree) error {
return nil
}
func (s *source) Rebuild(ot objecttree.ObjectTree) error {
func (s *treeSource) Rebuild(ot objecttree.ObjectTree) error {
if s.ObjectTree == nil {
return nil
}
@ -277,27 +241,27 @@ func (s *source) Rebuild(ot objecttree.ObjectTree) error {
return nil
}
func (s *source) ReadOnly() bool {
func (s *treeSource) ReadOnly() bool {
return false
}
func (s *source) Id() string {
func (s *treeSource) Id() string {
return s.id
}
func (s *source) SpaceID() string {
func (s *treeSource) SpaceID() string {
return s.spaceID
}
func (s *source) Type() smartblock.SmartBlockType {
func (s *treeSource) Type() smartblock.SmartBlockType {
return s.smartblockType
}
func (s *source) ReadDoc(_ context.Context, receiver ChangeReceiver, allowEmpty bool) (doc state.Doc, err error) {
func (s *treeSource) ReadDoc(_ context.Context, receiver source.ChangeReceiver, allowEmpty bool) (doc state.Doc, err error) {
return s.readDoc(receiver)
}
func (s *source) readDoc(receiver ChangeReceiver) (doc state.Doc, err error) {
func (s *treeSource) readDoc(receiver source.ChangeReceiver) (doc state.Doc, err error) {
s.receiver = receiver
setter, ok := s.ObjectTree.(synctree.ListenerSetter)
if !ok {
@ -308,7 +272,7 @@ func (s *source) readDoc(receiver ChangeReceiver) (doc state.Doc, err error) {
return s.buildState()
}
func (s *source) buildState() (doc state.Doc, err error) {
func (s *treeSource) buildState() (doc state.Doc, err error) {
st, _, changesAppliedSinceSnapshot, err := BuildState(s.spaceID, nil, s.ObjectTree, true)
if err != nil {
return
@ -320,16 +284,6 @@ func (s *source) buildState() (doc state.Doc, err error) {
}
st.BlocksInit(st)
// This is temporary migration. We will move it to persistent migration later after several releases.
// The reason is to minimize the number of glitches for users of both old and new versions of Anytype.
// For example, if we persist this migration for Dataview block now, user will see "No query selected"
// error in the old version of Anytype. We want to avoid this as much as possible by making this migration
// temporary, though the applying change to this Dataview block will persist this migration, breaking backward
// compatibility. But in many cases we expect that users update object not so often as they just view them.
// TODO: we can skip migration for non-personal spaces
migration := NewSubObjectsAndProfileLinksMigration(s.smartblockType, s.space, s.accountService.MyParticipantId(s.spaceID), s.objectStore)
migration.Migrate(st)
// we need to have required internal relations for all objects, including system
st.AddBundledRelationLinks(bundle.RequiredInternalRelations...)
if s.Type() == smartblock.SmartBlockTypePage || s.Type() == smartblock.SmartBlockTypeProfilePage {
@ -360,7 +314,7 @@ func (s *source) buildState() (doc state.Doc, err error) {
return st, nil
}
func (s *source) GetCreationInfo() (creatorObjectId string, createdDate int64, err error) {
func (s *treeSource) GetCreationInfo() (creatorObjectId string, createdDate int64, err error) {
header := s.ObjectTree.UnmarshalledHeader()
createdDate = header.Timestamp
if header.Identity != nil {
@ -369,15 +323,7 @@ func (s *source) GetCreationInfo() (creatorObjectId string, createdDate int64, e
return
}
type PushChangeParams struct {
State *state.State
Changes []*pb.ChangeContent
FileChangedHashes []string
Time time.Time // used to derive the lastModifiedDate; Default is time.Now()
DoSnapshot bool
}
func (s *source) PushChange(params PushChangeParams) (id string, err error) {
func (s *treeSource) PushChange(params source.PushChangeParams) (id string, err error) {
for _, change := range params.Changes {
name := reflection.GetChangeContent(change.Value)
if name == "" {
@ -431,7 +377,7 @@ func (s *source) PushChange(params PushChangeParams) (id string, err error) {
return
}
func (s *source) buildChange(params PushChangeParams) (c *pb.Change) {
func (s *treeSource) buildChange(params source.PushChangeParams) (c *pb.Change) {
c = &pb.Change{
Timestamp: params.Time.Unix(),
Version: params.State.MigrationVersion(),
@ -459,12 +405,12 @@ func (s *source) buildChange(params PushChangeParams) (c *pb.Change) {
func checkChangeSize(data []byte, maxSize int) error {
log.Debugf("Change size is %d bytes", len(data))
if len(data) > maxSize {
return ErrBigChangeSize
return source.ErrBigChangeSize
}
return nil
}
func (s *source) ListIds() (ids []string, err error) {
func (s *treeSource) ListIds() (ids []string, err error) {
if s.space == nil {
return
}
@ -498,18 +444,18 @@ func snapshotChance(changesSinceSnapshot int) bool {
return false
}
func (s *source) needSnapshot() bool {
func (s *treeSource) needSnapshot() bool {
if s.ObjectTree.Heads()[0] == s.ObjectTree.Id() {
return true
}
return snapshotChance(s.changesSinceSnapshot)
}
func (s *source) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
func (s *treeSource) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return s.getFileHashesForSnapshot(nil)
}
func (s *source) getFileHashesForSnapshot(changeHashes []string) []*pb.ChangeFileKeys {
func (s *treeSource) getFileHashesForSnapshot(changeHashes []string) []*pb.ChangeFileKeys {
fileKeys := s.getFileKeysByHashes(changeHashes)
var uniqKeys = make(map[string]struct{})
for _, fk := range fileKeys {
@ -542,7 +488,7 @@ func (s *source) getFileHashesForSnapshot(changeHashes []string) []*pb.ChangeFil
return fileKeys
}
func (s *source) getFileKeysByHashes(hashes []string) []*pb.ChangeFileKeys {
func (s *treeSource) getFileKeysByHashes(hashes []string) []*pb.ChangeFileKeys {
fileKeys := make([]*pb.ChangeFileKeys, 0, len(hashes))
for _, h := range hashes {
fk, err := s.fileService.FileGetKeys(domain.FileId(h))
@ -560,7 +506,7 @@ func (s *source) getFileKeysByHashes(hashes []string) []*pb.ChangeFileKeys {
return fileKeys
}
func (s *source) Heads() []string {
func (s *treeSource) Heads() []string {
if s.ObjectTree == nil {
return nil
}
@ -570,7 +516,7 @@ func (s *source) Heads() []string {
return headsCopy
}
func (s *source) Close() (err error) {
func (s *treeSource) Close() (err error) {
if s.unsubscribe != nil {
s.unsubscribe()
<-s.closed

View file

@ -1,4 +1,4 @@
package source
package sourceimpl
import (
"fmt"
@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
@ -51,30 +52,30 @@ func Test_snapshotChance2(t *testing.T) {
func TestSource_CheckChangeSize(t *testing.T) {
t.Run("big change", func(t *testing.T) {
//given
// given
c := &pb.Change{Content: []*pb.ChangeContent{{&pb.ChangeContentValueOfRelationAdd{RelationAdd: &pb.ChangeRelationAdd{
RelationLinks: []*model.RelationLink{{Key: bundle.RelationKeyName.String()}}},
}}}}
data, _ := c.Marshal()
//when
// when
err := checkChangeSize(data, len(data)-1)
//then
assert.ErrorIs(t, err, ErrBigChangeSize)
// then
assert.ErrorIs(t, err, source.ErrBigChangeSize)
})
t.Run("small change", func(t *testing.T) {
//given
// given
c := &pb.Change{Content: []*pb.ChangeContent{{&pb.ChangeContentValueOfRelationAdd{RelationAdd: &pb.ChangeRelationAdd{
RelationLinks: []*model.RelationLink{{Key: bundle.RelationKeyId.String()}}},
}}}}
data, _ := c.Marshal()
//when
// when
err := checkChangeSize(data, len(data)+1)
//then
// then
assert.NoError(t, err)
})
}

View file

@ -1,22 +1,16 @@
package source
package sourceimpl
import (
"context"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
)
type StaticSourceParams struct {
Id domain.FullID
SbType smartblock.SmartBlockType
State *state.State
CreatorId string
}
func (s *service) NewStaticSource(params StaticSourceParams) SourceWithType {
func (s *service) NewStaticSource(params source.StaticSourceParams) source.SourceWithType {
return &static{
id: params.Id,
sbType: params.SbType,
@ -50,11 +44,11 @@ func (s *static) ReadOnly() bool {
return true
}
func (s *static) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) {
func (s *static) ReadDoc(ctx context.Context, receiver source.ChangeReceiver, empty bool) (doc state.Doc, err error) {
return s.doc, nil
}
func (s *static) PushChange(params PushChangeParams) (id string, err error) {
func (s *static) PushChange(params source.PushChangeParams) (id string, err error) {
return
}

View file

@ -1,4 +1,4 @@
package source
package sourceimpl
import (
"bytes"
@ -7,10 +7,7 @@ import (
"errors"
"fmt"
"slices"
"time"
anystore "github.com/anyproto/any-store"
"github.com/anyproto/any-store/anyenc"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
@ -20,52 +17,30 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/editor/storestate"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/space"
"github.com/anyproto/anytype-heart/space/clientspace"
"github.com/anyproto/anytype-heart/space/clientspace/keyvalueservice"
)
type PushChangeHook func(params PushChangeParams) (id string, err error)
var _ updatelistener.UpdateListener = (*store)(nil)
type Store interface {
Source
ReadStoreDoc(ctx context.Context, stateStore *storestate.StoreState, onUpdateHook func()) (err error)
PushStoreChange(ctx context.Context, params PushStoreChangeParams) (changeId string, err error)
SetPushChangeHook(onPushChange PushChangeHook)
// RegisterDiffManager sets a hook that will be called when a change is removed (marked as read) from the diff manager
// must be called before ReadStoreDoc.
//
// If a head is marked as read in the diff manager, all earlier heads for that branch marked as read as well
RegisterDiffManager(name string, onRemoveHook func(removed []string))
// MarkSeenHeads marks heads as seen in a diff manager. Then the diff manager will call a hook from SetDiffManagerOnRemoveHook
MarkSeenHeads(ctx context.Context, name string, heads []string) error
// StoreSeenHeads persists current seen heads in any-store
StoreSeenHeads(ctx context.Context, name string) error
// InitDiffManager initializes a diff manager with specified seen heads
InitDiffManager(ctx context.Context, name string, seenHeads []string) error
}
type PushStoreChangeParams struct {
State *storestate.StoreState
Changes []*pb.StoreChangeContent
Time time.Time // used to derive the lastModifiedDate; Default is time.Now()
}
var (
_ updatelistener.UpdateListener = (*store)(nil)
_ Store = (*store)(nil)
_ source.Store = (*store)(nil)
)
type store struct {
*source
*treeSource
spaceService space.Service
store *storestate.StoreState
onUpdateHook func()
onPushChange PushChangeHook
onPushChange source.PushChangeHook
sbType smartblock.SmartBlockType
diffManagers map[string]*diffManager
@ -76,11 +51,15 @@ type diffManager struct {
onRemove func(removed []string)
}
func (s *store) getTechSpace() clientspace.Space {
return s.spaceService.TechSpace()
}
func (s *store) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}
func (s *store) SetPushChangeHook(onPushChange PushChangeHook) {
func (s *store) SetPushChangeHook(onPushChange source.PushChangeHook) {
s.onPushChange = onPushChange
}
@ -93,26 +72,45 @@ func (s *store) RegisterDiffManager(name string, onRemoveHook func(removed []str
}
func (s *store) initDiffManagers(ctx context.Context) error {
for name := range s.diffManagers {
seenHeads, err := s.loadSeenHeads(ctx, name)
if err != nil {
return fmt.Errorf("load seen heads: %w", err)
}
err = s.InitDiffManager(ctx, name, seenHeads)
for name, manager := range s.diffManagers {
err := s.InitDiffManager(ctx, name, nil)
if err != nil {
return fmt.Errorf("init diff manager: %w", err)
}
vals, err := s.getTechSpace().KeyValueService().Get(ctx, s.seenHeadsKey(name))
if err != nil {
log.With("error", err).Error("init diff manager: get value")
continue
}
for _, val := range vals {
seenHeads, err := unmarshalSeenHeads(val.Data)
if err != nil {
log.With("error", err).Error("init diff manager: unmarshal seen heads")
continue
}
manager.diffManager.Remove(seenHeads)
}
}
return nil
}
func unmarshalSeenHeads(raw []byte) ([]string, error) {
var seenHeads []string
err := json.Unmarshal(raw, &seenHeads)
if err != nil {
return nil, err
}
return seenHeads, nil
}
func (s *store) InitDiffManager(ctx context.Context, name string, seenHeads []string) (err error) {
manager, ok := s.diffManagers[name]
if !ok {
return nil
}
curTreeHeads := s.source.Tree().Heads()
curTreeHeads := s.treeSource.Tree().Heads()
buildTree := func(heads []string) (objecttree.ReadableObjectTree, error) {
return s.space.TreeBuilder().BuildHistoryTree(ctx, s.Id(), objecttreebuilder.HistoryTreeOpts{
@ -131,10 +129,25 @@ func (s *store) InitDiffManager(ctx context.Context, name string, seenHeads []st
return fmt.Errorf("init diff manager: %w", err)
}
err = s.getTechSpace().KeyValueService().SubscribeForKey(s.seenHeadsKey(name), name, func(key string, val keyvalueservice.Value) {
s.ObjectTree.Lock()
defer s.ObjectTree.Unlock()
newSeenHeads, err := unmarshalSeenHeads(val.Data)
if err != nil {
log.Errorf("subscribe for seenHeads: %s: %v", name, err)
return
}
manager.diffManager.Remove(newSeenHeads)
})
if err != nil {
return fmt.Errorf("subscribe: %w", err)
}
return
}
func (s *store) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) {
func (s *store) ReadDoc(ctx context.Context, receiver source.ChangeReceiver, empty bool) (doc state.Doc, err error) {
s.receiver = receiver
setter, ok := s.ObjectTree.(synctree.ListenerSetter)
if !ok {
@ -162,7 +175,7 @@ func (s *store) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool
return st, nil
}
func (s *store) PushChange(params PushChangeParams) (id string, err error) {
func (s *store) PushChange(params source.PushChangeParams) (id string, err error) {
if s.onPushChange != nil {
return s.onPushChange(params)
}
@ -173,10 +186,6 @@ func (s *store) ReadStoreDoc(ctx context.Context, storeState *storestate.StoreSt
s.onUpdateHook = onUpdateHook
s.store = storeState
err = s.initDiffManagers(ctx)
if err != nil {
return err
}
tx, err := s.store.NewTx(ctx)
if err != nil {
return
@ -194,10 +203,19 @@ func (s *store) ReadStoreDoc(ctx context.Context, storeState *storestate.StoreSt
if err = applier.Apply(); err != nil {
return errors.Join(tx.Rollback(), err)
}
return tx.Commit()
err = tx.Commit()
if err != nil {
return fmt.Errorf("commit: %w", err)
}
err = s.initDiffManagers(ctx)
if err != nil {
return fmt.Errorf("init diff managers: %w", err)
}
return nil
}
func (s *store) PushStoreChange(ctx context.Context, params PushStoreChangeParams) (changeId string, err error) {
func (s *store) PushStoreChange(ctx context.Context, params source.PushStoreChangeParams) (changeId string, err error) {
tx, err := s.store.NewTx(ctx)
if err != nil {
return "", fmt.Errorf("new tx: %w", err)
@ -211,7 +229,7 @@ func (s *store) PushStoreChange(ctx context.Context, params PushStoreChangeParam
}
data, dataType, err := MarshalStoreChange(change)
if err != nil {
return "", fmt.Errorf("marshal change: %w", err)
return "", rollback(fmt.Errorf("marshal change: %w", err))
}
addResult, err := s.ObjectTree.AddContentWithValidator(ctx, objecttree.SignableChangeContent{
@ -250,18 +268,22 @@ func (s *store) PushStoreChange(ctx context.Context, params PushStoreChangeParam
return "", err
}
for _, m := range s.diffManagers {
if m.diffManager != nil {
m.diffManager.Add(&objecttree.Change{
Id: changeId,
PreviousIds: ch.PreviousIds,
})
}
}
s.addToDiffManagers(&objecttree.Change{
Id: changeId,
PreviousIds: ch.PreviousIds,
})
return changeId, err
}
func (s *store) addToDiffManagers(change *objecttree.Change) {
for _, m := range s.diffManagers {
if m.diffManager != nil {
m.diffManager.Add(change)
}
}
}
func (s *store) update(ctx context.Context, tree objecttree.ObjectTree) error {
tx, err := s.store.NewTx(ctx)
if err != nil {
@ -276,15 +298,20 @@ func (s *store) update(ctx context.Context, tree objecttree.ObjectTree) error {
return errors.Join(tx.Rollback(), err)
}
err = tx.Commit()
s.updateInDiffManagers(tree)
if err == nil {
s.onUpdateHook()
}
return err
}
func (s *store) updateInDiffManagers(tree objecttree.ObjectTree) {
for _, m := range s.diffManagers {
if m.diffManager != nil {
m.diffManager.Update(tree)
}
}
if err == nil {
s.onUpdateHook()
}
return err
}
func (s *store) MarkSeenHeads(ctx context.Context, name string, heads []string) error {
@ -296,52 +323,23 @@ func (s *store) MarkSeenHeads(ctx context.Context, name string, heads []string)
return nil
}
func seenHeadsCollectionName(name string) string {
return "seenHeads/" + name
}
func (s *store) StoreSeenHeads(ctx context.Context, name string) error {
manager, ok := s.diffManagers[name]
if !ok {
return nil
}
coll, err := s.store.Collection(ctx, seenHeadsCollectionName(name))
if err != nil {
return fmt.Errorf("get collection: %w", err)
}
seenHeads := manager.diffManager.SeenHeads()
raw, err := json.Marshal(seenHeads)
if err != nil {
return fmt.Errorf("marshal seen heads: %w", err)
}
arena := &anyenc.Arena{}
doc := arena.NewObject()
doc.Set("id", arena.NewString(s.id))
doc.Set("h", arena.NewBinary(raw))
return coll.UpsertOne(ctx, doc)
return s.getTechSpace().KeyValueService().Set(ctx, s.seenHeadsKey(name), raw)
}
func (s *store) loadSeenHeads(ctx context.Context, name string) ([]string, error) {
coll, err := s.store.Collection(ctx, seenHeadsCollectionName(name))
if err != nil {
return nil, fmt.Errorf("get collection: %w", err)
}
doc, err := coll.FindId(ctx, s.id)
if errors.Is(err, anystore.ErrDocNotFound) {
return nil, nil
}
raw := doc.Value().GetBytes("h")
var seenHeads []string
err = json.Unmarshal(raw, &seenHeads)
if err != nil {
return nil, fmt.Errorf("unmarshal seen heads: %w", err)
}
return seenHeads, nil
func (s *store) seenHeadsKey(diffManagerName string) string {
return s.id + diffManagerName
}
func (s *store) Update(tree objecttree.ObjectTree) error {

View file

@ -1,4 +1,4 @@
package source
package sourceimpl
import (
"errors"

View file

@ -1,4 +1,4 @@
package source
package sourceimpl
import (
"context"

View file

@ -1,234 +0,0 @@
package source
import (
"context"
"fmt"
"strings"
"github.com/globalsign/mgo/bson"
"github.com/gogo/protobuf/types"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/simple"
dataview2 "github.com/anyproto/anytype-heart/core/block/simple/dataview"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/addr"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore/spaceindex"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
// Migrate old relation (rel-name, etc.) and object type (ot-page, etc.) IDs to new ones (just ordinary object IDs)
// Those old ids are ids of sub-objects, legacy system for storing types and relations inside workspace object
type subObjectsAndProfileLinksMigration struct {
profileID string
identityObjectID string
sbType smartblock.SmartBlockType
space Space
objectStore spaceindex.Store
}
func NewSubObjectsAndProfileLinksMigration(sbType smartblock.SmartBlockType, space Space, identityObjectID string, objectStore spaceindex.Store) *subObjectsAndProfileLinksMigration {
return &subObjectsAndProfileLinksMigration{
space: space,
identityObjectID: identityObjectID,
sbType: sbType,
objectStore: objectStore,
}
}
func (m *subObjectsAndProfileLinksMigration) replaceLinksInDetails(s *state.State) {
for _, rel := range s.GetRelationLinks() {
if rel.Key == bundle.RelationKeyFeaturedRelations.String() {
continue
}
if rel.Key == bundle.RelationKeySourceObject.String() {
// migrate broken sourceObject after v0.29.11
// todo: remove this
if s.UniqueKeyInternal() == "" {
continue
}
internalKey := s.UniqueKeyInternal()
switch m.sbType {
case smartblock.SmartBlockTypeRelation:
if bundle.HasRelation(domain.RelationKey(internalKey)) {
s.SetDetail(bundle.RelationKeySourceObject, domain.String(domain.RelationKey(internalKey).BundledURL()))
}
case smartblock.SmartBlockTypeObjectType:
if bundle.HasObjectTypeByKey(domain.TypeKey(internalKey)) {
s.SetDetail(bundle.RelationKeySourceObject, domain.String(domain.TypeKey(internalKey).BundledURL()))
}
}
continue
}
if m.canRelationContainObjectValues(rel.Format) {
rawValue := s.Details().Get(domain.RelationKey(rel.Key))
if oldId := rawValue.String(); oldId != "" {
newId := m.migrateId(oldId)
if oldId != newId {
s.SetDetail(domain.RelationKey(rel.Key), domain.String(newId))
}
} else if ids := rawValue.StringList(); len(ids) > 0 {
changed := false
for i, oldId := range ids {
newId := m.migrateId(oldId)
if oldId != newId {
ids[i] = newId
changed = true
}
}
if changed {
s.SetDetail(domain.RelationKey(rel.Key), domain.StringList(ids))
}
}
}
}
}
// Migrate works only in personal space
func (m *subObjectsAndProfileLinksMigration) Migrate(s *state.State) {
if !m.space.IsPersonal() {
return
}
uk, err := domain.NewUniqueKey(smartblock.SmartBlockTypeProfilePage, "")
if err != nil {
log.Errorf("migration: failed to create unique key for profile: %s", err)
} else {
// this way we will get incorrect profileID for non-personal spaces, but we are not migrating them
id, err := m.space.DeriveObjectID(context.Background(), uk)
if err != nil {
log.Errorf("migration: failed to derive id for profile: %s", err)
} else {
m.profileID = id
}
}
m.replaceLinksInDetails(s)
s.Iterate(func(block simple.Block) bool {
if block.Model().GetDataview() != nil {
// Mark block as mutable
dv := s.Get(block.Model().Id).(dataview2.Block)
m.migrateFilters(dv)
}
if _, ok := block.(simple.ObjectLinkReplacer); ok {
// Mark block as mutable
b := s.Get(block.Model().Id)
replacer := b.(simple.ObjectLinkReplacer)
replacer.ReplaceLinkIds(m.migrateId)
}
return true
})
}
func (m *subObjectsAndProfileLinksMigration) migrateId(oldId string) (newId string) {
if m.profileID != "" && m.identityObjectID != "" {
// we substitute all links to profile object with space member object
if oldId == m.profileID ||
strings.HasPrefix(oldId, "_id_") { // we don't need to check the exact accountID here, because we only have links to our own identity
return m.identityObjectID
}
}
uniqueKey, valid := subObjectIdToUniqueKey(oldId)
if !valid {
return oldId
}
newId, err := m.space.DeriveObjectID(context.Background(), uniqueKey)
if err != nil {
log.With("uniqueKey", uniqueKey.Marshal()).Errorf("failed to derive id: %s", err)
return oldId
}
return newId
}
// subObjectIdToUniqueKey converts legacy sub-object id to uniqueKey
// if id is not supported subObjectId, it will return nil, false
// suppose to be used only for migration and almost free to use
func subObjectIdToUniqueKey(id string) (uniqueKey domain.UniqueKey, valid bool) {
// historically, we don't have the prefix for the options,
// so we need to handled it this ugly way
if bson.IsObjectIdHex(id) {
return domain.MustUniqueKey(smartblock.SmartBlockTypeRelationOption, id), true
}
// special case: we don't support bundled relations/types in uniqueKeys (GO-2394). So in case we got it, we need to replace the prefix
if strings.HasPrefix(id, addr.BundledObjectTypeURLPrefix) {
id = addr.ObjectTypeKeyToIdPrefix + strings.TrimPrefix(id, addr.BundledObjectTypeURLPrefix)
} else if strings.HasPrefix(id, addr.BundledRelationURLPrefix) {
id = addr.RelationKeyToIdPrefix + strings.TrimPrefix(id, addr.BundledRelationURLPrefix)
}
uniqueKey, err := domain.UnmarshalUniqueKey(id)
if err != nil {
return nil, false
}
return uniqueKey, true
}
func (m *subObjectsAndProfileLinksMigration) migrateFilters(dv dataview2.Block) {
for _, view := range dv.Model().GetDataview().GetViews() {
for _, filter := range view.GetFilters() {
err := m.migrateFilter(filter)
if err != nil {
log.Errorf("failed to migrate filter %s: %s", filter.Id, err)
}
}
}
}
func (m *subObjectsAndProfileLinksMigration) migrateFilter(filter *model.BlockContentDataviewFilter) error {
if filter == nil {
return nil
}
if filter.Value == nil || filter.Value.Kind == nil {
log.With("relationKey", filter.RelationKey).Warnf("empty filter value")
return nil
}
relation, err := m.objectStore.GetRelationByKey(filter.RelationKey)
if err != nil {
log.Warnf("migration: failed to get relation by key %s: %s", filter.RelationKey, err)
}
// TODO: check this logic
// here we use objectstore to get relation, but it may be not yet available
// In case it is missing, lets try to migrate any string/stringlist: it should ignore invalid strings
if relation == nil || m.canRelationContainObjectValues(relation.Format) {
switch v := filter.Value.Kind.(type) {
case *types.Value_StringValue:
filter.Value = pbtypes.String(m.migrateId(v.StringValue))
case *types.Value_ListValue:
newIDs := make([]string, 0, len(v.ListValue.Values))
for _, oldID := range v.ListValue.Values {
if id, ok := oldID.Kind.(*types.Value_StringValue); ok {
newIDs = append(newIDs, m.migrateId(id.StringValue))
} else {
return fmt.Errorf("migration: failed to migrate filter: invalid list item value kind %t", oldID.Kind)
}
}
filter.Value = pbtypes.StringList(newIDs)
}
}
return nil
}
func (m *subObjectsAndProfileLinksMigration) canRelationContainObjectValues(format model.RelationFormat) bool {
switch format {
case
model.RelationFormat_status,
model.RelationFormat_tag,
model.RelationFormat_object:
return true
default:
return false
}
}

View file

@ -1,55 +0,0 @@
package source
import (
"testing"
"github.com/anyproto/anytype-heart/core/domain"
)
func TestSubObjectIdToUniqueKey(t *testing.T) {
type args struct {
id string
}
tests := []struct {
name string
args args
wantUk string
wantValid bool
}{
{"relation", args{"rel-id"}, "rel-id", true},
{"type", args{"ot-task"}, "ot-task", true},
{"opt", args{"650832666293ae9ae67e5f9c"}, "opt-650832666293ae9ae67e5f9c", true},
{"invalid-prefix", args{"aa-task"}, "", false},
{"no-key", args{"rel"}, "", false},
{"no-key2", args{"rel-"}, "", false},
{"no-key2", args{"rel---gdfgfd--gfdgfd-"}, "", false},
{"invalid", args{"task"}, "", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotUk, gotValid := subObjectIdToUniqueKey(tt.args.id)
if gotValid != tt.wantValid {
t.Errorf("SubObjectIdToUniqueKey() gotValid = %v, want %v", gotValid, tt.wantValid)
t.Fail()
}
if !tt.wantValid {
return
}
wantUk, err := domain.UnmarshalUniqueKey(tt.wantUk)
if err != nil {
t.Errorf("SubObjectIdToUniqueKey() error = %v", err)
t.Fail()
}
if wantUk.Marshal() != gotUk.Marshal() {
t.Errorf("SubObjectIdToUniqueKey() gotUk = %v, want %v", gotUk, tt.wantUk)
t.Fail()
}
if wantUk.SmartblockType() != gotUk.SmartblockType() {
t.Errorf("SubObjectIdToUniqueKey() gotSmartblockType = %v, want %v", gotUk.SmartblockType(), wantUk.SmartblockType())
t.Fail()
}
})
}
}

View file

@ -5,7 +5,7 @@ import (
"fmt"
"time"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/block/source/sourceimpl"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/space"
@ -25,7 +25,7 @@ func BuildDetailsFromTimestamp(
return nil, fmt.Errorf("failed to get date type id: %w", err)
}
dateSource := source.NewDate(source.DateSourceParams{
dateSource := sourceimpl.NewDate(sourceimpl.DateSourceParams{
Id: domain.FullID{
SpaceID: spaceId,
ObjectID: dateutil.NewDateObject(time.Unix(timestamp, 0), false).Id(),
@ -33,7 +33,7 @@ func BuildDetailsFromTimestamp(
DateObjectTypeId: dateTypeId,
})
detailsGetter, ok := dateSource.(source.SourceIdEndodedDetails)
detailsGetter, ok := dateSource.(sourceimpl.SourceIdEndodedDetails)
if !ok {
return nil, fmt.Errorf("date object does not implement SourceIdEndodedDetails: %w", err)
}

View file

@ -11,6 +11,7 @@ import (
"github.com/araddon/dateparse"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/block/source/sourceimpl"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/database"
@ -190,7 +191,7 @@ func makeSuggestedDateRecord(ctx context.Context, spc source.Space, id string) (
return database.Record{}, fmt.Errorf("failed to find Date type to build Date object: %w", err)
}
dateSource := source.NewDate(source.DateSourceParams{
dateSource := sourceimpl.NewDate(sourceimpl.DateSourceParams{
Id: domain.FullID{
ObjectID: id,
SpaceID: spc.Id(),
@ -198,7 +199,7 @@ func makeSuggestedDateRecord(ctx context.Context, spc source.Space, id string) (
DateObjectTypeId: typeId,
})
v, ok := dateSource.(source.SourceIdEndodedDetails)
v, ok := dateSource.(sourceimpl.SourceIdEndodedDetails)
if !ok {
return database.Record{}, fmt.Errorf("source does not implement DetailsFromId")
}

View file

@ -3,7 +3,7 @@ package debug
import (
"fmt"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/block/source/sourceimpl"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/util/anonymize"
)
@ -13,7 +13,7 @@ type changeDataConverter struct {
}
func (c *changeDataConverter) Unmarshall(dataType string, decrypted []byte) (res any, err error) {
return source.UnmarshalChangeWithDataType(dataType, decrypted)
return sourceimpl.UnmarshalChangeWithDataType(dataType, decrypted)
}
func (c *changeDataConverter) Marshall(model any) (data []byte, dataType string, err error) {

View file

@ -7,7 +7,7 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/block/source/sourceimpl"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
@ -65,7 +65,7 @@ func (t *treeImporter) State() (*state.State, error) {
err error
)
st, _, _, err = source.BuildState("", nil, t.objectTree, true)
st, _, _, err = sourceimpl.BuildState("", nil, t.objectTree, true)
if err != nil {
return nil, err
}
@ -81,7 +81,7 @@ func (t *treeImporter) Json() (treeJson TreeJson, err error) {
Id: t.objectTree.Id(),
}
i := 0
err = t.objectTree.IterateRoot(source.UnmarshalChange, func(change *objecttree.Change) bool {
err = t.objectTree.IterateRoot(sourceimpl.UnmarshalChange, func(change *objecttree.Change) bool {
defer func() { i++ }()
if change.Id == t.objectTree.Id() {
return true
@ -104,7 +104,7 @@ func (t *treeImporter) ChangeAt(idx int) (idCh IdChange, err error) {
return
}
i := 0
err = t.objectTree.IterateRoot(source.UnmarshalChange, func(change *objecttree.Change) bool {
err = t.objectTree.IterateRoot(sourceimpl.UnmarshalChange, func(change *objecttree.Change) bool {
defer func() { i++ }()
if change.Id == t.objectTree.Id() {
return true

View file

@ -24,7 +24,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/object/idresolver"
"github.com/anyproto/anytype-heart/core/block/object/objectlink"
"github.com/anyproto/anytype-heart/core/block/simple"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/block/source/sourceimpl"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -144,7 +144,7 @@ func (h *history) Versions(id domain.FullID, lastVersionId string, limit int, no
}
var data []*pb.RpcHistoryVersion
e = tree.IterateFrom(tree.Root().Id, source.UnmarshalChange, func(c *objecttree.Change) (isContinue bool) {
e = tree.IterateFrom(tree.Root().Id, sourceimpl.UnmarshalChange, func(c *objecttree.Change) (isContinue bool) {
participantId := domain.NewParticipantId(id.SpaceID, c.Identity.Account())
data = h.fillVersionData(c, curHeads, participantId, data, hasher)
return true
@ -405,7 +405,7 @@ func (h *history) GetBlocksParticipants(id domain.FullID, versionId string, bloc
}
blocksParticipantsMap := make(map[string]string, 0)
err = tree.IterateFrom(tree.Root().Id, source.UnmarshalChange, func(c *objecttree.Change) (isContinue bool) {
err = tree.IterateFrom(tree.Root().Id, sourceimpl.UnmarshalChange, func(c *objecttree.Change) (isContinue bool) {
h.fillBlockParticipantMap(c, id, blocksParticipantsMap, existingBlocks)
return true
})
@ -555,7 +555,7 @@ func (h *history) buildState(id domain.FullID, versionId string) (
return
}
st, _, _, err = source.BuildState(id.SpaceID, nil, tree, true)
st, _, _, err = sourceimpl.BuildState(id.SpaceID, nil, tree, true)
if err != nil {
return
}

View file

@ -77,7 +77,7 @@ type indexer struct {
func (i *indexer) Init(a *app.App) (err error) {
i.store = a.MustComponent(objectstore.CName).(objectstore.ObjectStore)
i.storageService = a.MustComponent(spacestorage.CName).(storage.ClientStorage)
i.source = a.MustComponent(source.CName).(source.Service)
i.source = app.MustComponent[source.Service](a)
i.btHash = a.MustComponent("builtintemplate").(Hasher)
i.fileStore = app.MustComponent[filestore.FileStore](a)
i.ftsearch = app.MustComponent[ftsearch.FTSearch](a)

View file

@ -248,8 +248,12 @@ func (i *indexer) reindexChats(ctx context.Context, space clientspace.Space) err
if err != nil {
return fmt.Errorf("write tx: %w", err)
}
defer txn.Rollback()
var commited bool
defer func() {
if !commited {
txn.Rollback()
}
}()
for _, id := range ids {
col, err := db.OpenCollection(txn.Context(), id+chatobject.CollectionName)
if errors.Is(err, anystore.ErrCollectionNotFound) {
@ -276,6 +280,7 @@ func (i *indexer) reindexChats(ctx context.Context, space clientspace.Space) err
}
}
commited = true
err = txn.Commit()
if err != nil {
return fmt.Errorf("commit: %w", err)

View file

@ -79,24 +79,17 @@ func (i *spaceIndexer) indexBatch(tasks []indexTask) (err error) {
}
}
defer func() {
if err != nil {
_ = tx.Rollback()
} else {
if err = tx.Commit(); err != nil {
closeTasks(err)
} else {
closeTasks(nil)
}
log.Infof("indexBatch: indexed %d docs for a %v: err: %v", len(tasks), time.Since(st), err)
}
}()
for _, task := range tasks {
if iErr := i.index(tx.Context(), task.info, task.options...); iErr != nil {
task.done <- iErr
}
}
if err = tx.Commit(); err != nil {
closeTasks(err)
} else {
closeTasks(nil)
}
log.Infof("indexBatch: indexed %d docs for a %v: err: %v", len(tasks), time.Since(st), err)
return
}

View file

@ -21,14 +21,18 @@ type ObjectIDDeriver interface {
var (
defaultFeaturedRelationKeys = []domain.RelationKey{
bundle.RelationKeyType,
bundle.RelationKeyTag,
bundle.RelationKeyBacklinks,
bundle.RelationKeyTag,
}
defaultSetFeaturedRelationKeys = []domain.RelationKey{
bundle.RelationKeyType,
bundle.RelationKeySetOf,
bundle.RelationKeyTag,
bundle.RelationKeyBacklinks,
}
defaultCollectionFeaturedRelationKeys = []domain.RelationKey{
bundle.RelationKeyType,
bundle.RelationKeyBacklinks,
}
@ -67,8 +71,11 @@ var (
)
func DefaultFeaturedRelationKeys(typeKey domain.TypeKey) []domain.RelationKey {
if typeKey == bundle.TypeKeySet {
switch typeKey {
case bundle.TypeKeySet:
return defaultSetFeaturedRelationKeys
case bundle.TypeKeyCollection:
return defaultCollectionFeaturedRelationKeys
}
return defaultFeaturedRelationKeys
}

View file

@ -187,10 +187,11 @@ func TestFillRecommendedRelations(t *testing.T) {
// then
assert.NoError(t, err)
assert.False(t, isAlreadyFilled)
assert.Equal(t, buildRelationIds(defaultRecommendedRelationKeys), details.GetStringList(bundle.RelationKeyRecommendedRelations))
assert.Equal(t, buildRelationIds(append([]domain.RelationKey{bundle.RelationKeyTag}, defaultRecommendedRelationKeys...)),
details.GetStringList(bundle.RelationKeyRecommendedRelations))
assert.Equal(t, buildRelationIds(defaultSetFeaturedRelationKeys), details.GetStringList(bundle.RelationKeyRecommendedFeaturedRelations))
assert.Equal(t, defaultRecHiddenRelIds, details.GetStringList(bundle.RelationKeyRecommendedHiddenRelations))
assert.Len(t, keys, 4+3+3) // 4 featured + 3 sidebar + 3 hidden
assert.Len(t, keys, 3+4+3) // 3 featured + 4 sidebar + 3 hidden
})
}

View file

@ -93,7 +93,7 @@ func TestSubscribe(t *testing.T) {
// Add space view and objects
fx.objectStore.AddObjects(t, techSpaceId, []objectstore.TestObject{
givenSpaceViewObject("spaceView1", "space1", model.Account_Active, model.SpaceStatus_Ok),
givenSpaceViewObject("spaceView1", "space1", model.SpaceStatus_SpaceActive, model.SpaceStatus_Ok),
})
// Subscribe
@ -166,7 +166,7 @@ func TestSubscribe(t *testing.T) {
t.Run("add first space", func(t *testing.T) {
// Add space view
fx.objectStore.AddObjects(t, techSpaceId, []objectstore.TestObject{
givenSpaceViewObject("spaceView1", "space1", model.Account_Active, model.SpaceStatus_Ok),
givenSpaceViewObject("spaceView1", "space1", model.SpaceStatus_SpaceActive, model.SpaceStatus_Ok),
})
// Add objects
@ -213,7 +213,7 @@ func TestSubscribe(t *testing.T) {
t.Run("add second space", func(t *testing.T) {
// Add space view
fx.objectStore.AddObjects(t, techSpaceId, []objectstore.TestObject{
givenSpaceViewObject("spaceView2", "space2", model.Account_Active, model.SpaceStatus_Ok),
givenSpaceViewObject("spaceView2", "space2", model.SpaceStatus_SpaceActive, model.SpaceStatus_Ok),
})
// Add objects
@ -246,7 +246,7 @@ func TestSubscribe(t *testing.T) {
// Add space view and objects
fx.objectStore.AddObjects(t, techSpaceId, []objectstore.TestObject{
givenSpaceViewObject("spaceView1", "space1", model.Account_Active, model.SpaceStatus_Ok),
givenSpaceViewObject("spaceView1", "space1", model.SpaceStatus_SpaceActive, model.SpaceStatus_Ok),
})
obj1 := objectstore.TestObject{
bundle.RelationKeyId: domain.String("participant1"),
@ -270,7 +270,7 @@ func TestSubscribe(t *testing.T) {
// Remove space view by changing its status
fx.objectStore.AddObjects(t, techSpaceId, []objectstore.TestObject{
givenSpaceViewObject("spaceView1", "space1", model.Account_Deleted, model.SpaceStatus_Unknown),
givenSpaceViewObject("spaceView1", "space1", model.SpaceStatus_SpaceDeleted, model.SpaceStatus_Unknown),
})
// Wait events
@ -292,7 +292,7 @@ func TestSubscribe(t *testing.T) {
// Add space view and objects
fx.objectStore.AddObjects(t, techSpaceId, []objectstore.TestObject{
givenSpaceViewObject("spaceView1", "space1", model.Account_Active, model.SpaceStatus_Ok),
givenSpaceViewObject("spaceView1", "space1", model.SpaceStatus_SpaceActive, model.SpaceStatus_Ok),
})
obj1 := objectstore.TestObject{
bundle.RelationKeyId: domain.String("participant1"),
@ -316,7 +316,7 @@ func TestSubscribe(t *testing.T) {
// Change status to loading. It reflects how it could work in real application.
fx.objectStore.AddObjects(t, techSpaceId, []objectstore.TestObject{
givenSpaceViewObject("spaceView1", "space1", model.Account_Active, model.SpaceStatus_Loading),
givenSpaceViewObject("spaceView1", "space1", model.SpaceStatus_SpaceActive, model.SpaceStatus_Loading),
})
// Nothing happens
@ -338,7 +338,7 @@ func TestUnsubscribe(t *testing.T) {
// Add space view
fx.objectStore.AddObjects(t, techSpaceId, []objectstore.TestObject{
givenSpaceViewObject("spaceView1", "space1", model.Account_Active, model.SpaceStatus_Ok),
givenSpaceViewObject("spaceView1", "space1", model.SpaceStatus_SpaceActive, model.SpaceStatus_Ok),
})
// Subscribe
@ -453,12 +453,12 @@ func givenRequest() subscriptionservice.SubscribeRequest {
}
}
func givenSpaceViewObject(id string, targetSpaceId string, accountStatus model.AccountStatusType, localStatus model.SpaceStatus) objectstore.TestObject {
func givenSpaceViewObject(id string, targetSpaceId string, spaceStatus model.SpaceStatus, localStatus model.SpaceStatus) objectstore.TestObject {
return objectstore.TestObject{
bundle.RelationKeyId: domain.String(id),
bundle.RelationKeyTargetSpaceId: domain.String(targetSpaceId),
bundle.RelationKeyResolvedLayout: domain.Int64(int64(model.ObjectType_spaceView)),
bundle.RelationKeySpaceAccountStatus: domain.Int64(int64(accountStatus)),
bundle.RelationKeySpaceAccountStatus: domain.Int64(int64(spaceStatus)),
bundle.RelationKeySpaceLocalStatus: domain.Int64(int64(localStatus)),
}
}

View file

@ -31,7 +31,7 @@ func (s *service) runSpaceViewSub() error {
{
RelationKey: bundle.RelationKeySpaceAccountStatus,
Condition: model.BlockContentDataviewFilter_NotIn,
Value: domain.Int64List([]model.AccountStatusType{model.Account_Deleted}),
Value: domain.Int64List([]model.SpaceStatus{model.SpaceStatus_SpaceDeleted, model.SpaceStatus_SpaceRemoving}),
},
},
Internal: true,

View file

@ -23,7 +23,7 @@ func (mw *Middleware) BlockCreateWidget(cctx context.Context, req *pb.RpcBlockCr
if req.Block != nil {
req.Block.Id = ""
}
id, err = bs.CreateWidgetBlock(ctx, req)
id, err = bs.CreateWidgetBlock(ctx, req, false)
return err
})
if err != nil {

View file

@ -2,6 +2,8 @@ package core
import (
"context"
"errors"
"time"
"github.com/gogo/protobuf/types"
@ -61,8 +63,14 @@ func (mw *Middleware) WorkspaceOpen(cctx context.Context, req *pb.RpcWorkspaceOp
}
return m
}
info, err := mustService[account.Service](mw).GetSpaceInfo(cctx, req.SpaceId)
ctx, cancel := context.WithTimeout(cctx, time.Second*10)
defer cancel()
info, err := mustService[account.Service](mw).GetSpaceInfo(ctx, req.SpaceId)
if err != nil {
if errors.Is(context.DeadlineExceeded, err) {
return response(nil, pb.RpcWorkspaceOpenResponseError_FAILED_TO_LOAD, errors.New("space is not ready: check your internet connection and try again later"))
}
return response(info, pb.RpcWorkspaceOpenResponseError_UNKNOWN_ERROR, err)
}

View file

@ -25131,6 +25131,8 @@ Middleware-to-front-end response, that can contain a NULL error or a non-NULL er
| GET_STARTED | 1 | |
| EMPTY | 2 | |
| GUIDE_ONLY | 3 | only the guide without other tables |
| GET_STARTED_MOBILE | 4 | |
| EMPTY_MOBILE | 5 | |
@ -26478,6 +26480,7 @@ Middleware-to-front-end response, that can contain a NULL error or a non-NULL er
| NULL | 0 | |
| UNKNOWN_ERROR | 1 | |
| BAD_INPUT | 2 | |
| FAILED_TO_LOAD | 100 | |

2
go.mod
View file

@ -8,7 +8,7 @@ require (
github.com/VividCortex/ewma v1.2.0
github.com/adrium/goheif v0.0.0-20230113233934-ca402e77a786
github.com/anyproto/any-store v0.1.13
github.com/anyproto/any-sync v0.6.12-0.20250403083555-120f03409ea0
github.com/anyproto/any-sync v0.7.2
github.com/anyproto/anytype-publish-server/publishclient v0.0.0-20250131145601-de288583ff2a
github.com/anyproto/anytype-push-server/pushclient v0.0.0-20250402124745-6451298047f7
github.com/anyproto/go-chash v0.1.0

4
go.sum
View file

@ -82,6 +82,10 @@ github.com/anyproto/any-store v0.1.13 h1:1wmm0qQIRShaycBLKwcgkQbRKy3WrNPAShTE5fw
github.com/anyproto/any-store v0.1.13/go.mod h1:2M0Xf4rmijoKGd+nqqeKG8I1yIokCLEIxrAXEoHjXn4=
github.com/anyproto/any-sync v0.6.12-0.20250403083555-120f03409ea0 h1:wF1tT7uXmuZoWI/+yeH7pGo0S7AZF4WJzb/BWyec3cc=
github.com/anyproto/any-sync v0.6.12-0.20250403083555-120f03409ea0/go.mod h1:TSKgCoTV40Bt8AfCh3RxPUUAfYGrhc8Mzh8/AiVlvX4=
github.com/anyproto/any-sync v0.7.1 h1:xhaT8Xynbq+B/ka9Xud+6Bu9WoMvEPRUqumk+Zt2mMU=
github.com/anyproto/any-sync v0.7.1/go.mod h1:TSKgCoTV40Bt8AfCh3RxPUUAfYGrhc8Mzh8/AiVlvX4=
github.com/anyproto/any-sync v0.7.2 h1:S1UPzW0iYTLwsMAZ3rN/EJwthTGuadsvXdnGYNiC6cA=
github.com/anyproto/any-sync v0.7.2/go.mod h1:TSKgCoTV40Bt8AfCh3RxPUUAfYGrhc8Mzh8/AiVlvX4=
github.com/anyproto/anytype-publish-server/publishclient v0.0.0-20250131145601-de288583ff2a h1:ZZM+0OUCQMWSLSflpkf0ZMVo3V76qEDDIXPpQOClNs0=
github.com/anyproto/anytype-publish-server/publishclient v0.0.0-20250131145601-de288583ff2a/go.mod h1:4fkueCZcGniSMXkrwESO8zzERrh/L7WHimRNWecfGM0=
github.com/anyproto/anytype-push-server/pushclient v0.0.0-20250402124745-6451298047f7 h1:oKkEnxnN1jeB1Ty20CTMH3w4WkCrV8dOQy1Myetg7XA=

File diff suppressed because it is too large Load diff

View file

@ -1326,6 +1326,8 @@ message Rpc {
UNKNOWN_ERROR = 1;
BAD_INPUT = 2;
FAILED_TO_LOAD = 100;
}
}
}
@ -3106,6 +3108,8 @@ message Rpc {
GET_STARTED = 1;
EMPTY = 2;
GUIDE_ONLY = 3; // only the guide without other tables
GET_STARTED_MOBILE = 4;
EMPTY_MOBILE = 5;
}
}

View file

@ -235,8 +235,14 @@ func (r *clientds) Name() (name string) {
func (r *clientds) Close(ctx context.Context) (err error) {
close(r.closing)
timeout := time.After(time.Minute)
select {
case <-r.syncerFinished:
case <-timeout:
return fmt.Errorf("sync time out")
}
// wait syncer goroutine to finish to make sure we don't have in-progress requests, because it may cause panics
<-r.syncerFinished
if r.localstoreDS != nil {
err2 := r.localstoreDS.Close()
if err2 != nil {

View file

@ -136,6 +136,12 @@ func (s *dsObjectStore) ClearFullTextQueue(spaceIds []string) error {
if err != nil {
return fmt.Errorf("start write tx: %w", err)
}
var commited bool
defer func() {
if !commited {
txn.Rollback()
}
}()
iter, err := s.fulltextQueue.Find(filterIn).Iter(txn.Context())
if err != nil {
return fmt.Errorf("create iterator: %w", err)
@ -153,6 +159,7 @@ func (s *dsObjectStore) ClearFullTextQueue(spaceIds []string) error {
return fmt.Errorf("del doc: %w", err)
}
}
commited = true
return txn.Commit()
}

View file

@ -477,23 +477,32 @@ func (s *dsObjectStore) EnqueueAllForFulltextIndexing(ctx context.Context) error
s.arenaPool.Put(arena)
}()
const maxErrorsToLog = 5
var loggedErrors int
err = collectCrossSpaceWithoutTech(s, func(store spaceindex.Store) error {
err = store.IterateAll(func(doc *anyenc.Value) error {
err := store.IterateAll(func(doc *anyenc.Value) error {
id := doc.GetString(idKey)
spaceId := doc.GetString(spaceIdKey)
arena.Reset()
obj := arena.NewObject()
obj.Set(idKey, arena.NewString(id))
obj.Set(spaceIdKey, arena.NewString(spaceId))
err = s.fulltextQueue.UpsertOne(txn.Context(), obj)
err := s.fulltextQueue.UpsertOne(txn.Context(), obj)
if err != nil {
return err
if loggedErrors < maxErrorsToLog {
log.With("error", err).Warnf("EnqueueAllForFulltextIndexing: upsert")
loggedErrors++
}
return nil
}
arena.Reset()
return nil
})
return err
})
if err != nil {
_ = txn.Rollback()
return err
}
return txn.Commit()

View file

@ -36,9 +36,12 @@ func (s *dsObjectStore) DeleteObject(id string) error {
if err != nil {
return fmt.Errorf("write txn: %w", err)
}
rollback := func(err error) error {
return errors.Join(txn.Rollback(), err)
}
var commited bool
defer func() {
if !commited {
txn.Rollback()
}
}()
newDetails := domain.NewDetails()
newDetails.SetString(bundle.RelationKeyId, id)
@ -48,21 +51,22 @@ func (s *dsObjectStore) DeleteObject(id string) error {
// do not completely remove object details, so we can distinguish links to deleted and not-yet-loaded objects
err = s.UpdateObjectDetails(txn.Context(), id, newDetails)
if err != nil {
return rollback(fmt.Errorf("failed to overwrite details and relations: %w", err))
return fmt.Errorf("delete: update details: %w", err)
}
err = s.fulltextQueue.RemoveIdsFromFullTextQueue([]string{id})
if err != nil {
return rollback(fmt.Errorf("delete: fulltext queue: %w", err))
return fmt.Errorf("delete: fulltext queue remove: %w", err)
}
err = s.headsState.DeleteId(txn.Context(), id)
if err != nil && !errors.Is(err, anystore.ErrDocNotFound) {
return rollback(fmt.Errorf("delete: heads state: %w", err))
return fmt.Errorf("delete: heads state delete: %w", err)
}
err = s.eraseLinksForObject(txn.Context(), id)
if err != nil {
return rollback(err)
return fmt.Errorf("delete: erase links: %w", err)
}
commited = true
err = txn.Commit()
if err != nil {
return fmt.Errorf("delete object info: %w", err)

View file

@ -18,41 +18,34 @@ func (s *dsObjectStore) GetWithLinksInfoById(id string) (*model.ObjectInfoWithLi
if err != nil {
return nil, fmt.Errorf("read txn: %w", err)
}
commit := func(err error) error {
return errors.Join(txn.Commit(), err)
}
defer txn.Commit()
pages, err := s.getObjectsInfo(txn.Context(), []string{id})
if err != nil {
return nil, commit(err)
return nil, err
}
if len(pages) == 0 {
return nil, commit(fmt.Errorf("page not found"))
return nil, fmt.Errorf("page not found")
}
page := pages[0]
inboundIds, err := s.findInboundLinks(txn.Context(), id)
if err != nil {
return nil, commit(fmt.Errorf("find inbound links: %w", err))
return nil, fmt.Errorf("find inbound links: %w", err)
}
outboundsIds, err := s.findOutboundLinks(txn.Context(), id)
if err != nil {
return nil, commit(fmt.Errorf("find outbound links: %w", err))
return nil, fmt.Errorf("find outbound links: %w", err)
}
inbound, err := s.getObjectsInfo(txn.Context(), inboundIds)
if err != nil {
return nil, commit(err)
return nil, err
}
outbound, err := s.getObjectsInfo(txn.Context(), outboundsIds)
if err != nil {
return nil, commit(err)
}
err = txn.Commit()
if err != nil {
return nil, fmt.Errorf("commit txn: %w", err)
return nil, err
}
inboundProto := make([]*model.ObjectInfo, 0, len(inbound))

View file

@ -576,10 +576,17 @@ func (s *dsObjectStore) IterateAll(proc func(doc *anyenc.Value) error) error {
}
defer iter.Close()
const maxErrorsToLog = 5
var loggedErrors int
for iter.Next() {
doc, err := iter.Doc()
if err != nil {
return fmt.Errorf("get doc: %w", err)
if loggedErrors < maxErrorsToLog {
log.With("error", err).Error("IterateAll: get doc")
loggedErrors++
}
continue
}
err = proc(doc.Value())
if err != nil {

View file

@ -250,7 +250,7 @@ func (s *dsObjectStore) getPendingLocalDetails(txn *badger.Txn, key []byte) (*mo
func (s *dsObjectStore) updateObjectLinks(ctx context.Context, id string, links []string) (added []string, removed []string, err error) {
_, err = s.links.UpsertId(ctx, id, query.ModifyFunc(func(arena *anyenc.Arena, val *anyenc.Value) (*anyenc.Value, bool, error) {
prev := anyEncArrayToStrings(val.GetArray(linkOutboundField))
added, removed = slice.DifferenceRemovedAdded(prev, links)
removed, added = slice.DifferenceRemovedAdded(prev, links)
val.Set(linkOutboundField, stringsToJsonArray(arena, links))
return val, len(added)+len(removed) > 0, nil
}))

View file

@ -0,0 +1,276 @@
package keyvalueservice
import (
"context"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"fmt"
"math"
"sync"
"github.com/anyproto/any-sync/commonspace"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage/innerstorage"
"go.uber.org/zap"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver"
)
const CName = "core.keyvalueservice"
var log = logging.Logger(CName).Desugar()
type ObserverFunc func(key string, val Value)
type Value struct {
Key string
Data []byte
TimestampMilli int
}
type subscription struct {
name string
observerFunc ObserverFunc
}
type derivedKey string
// Service provides convenient wrapper for using per-space key-value store.
// It automatically hashes keys for security reasons: no one except you can see actual value of a key. How it works:
// - A key (client key) is hashed using a salt. Salt is the first read key from the space. Call it derived key
// - Then we use derived key as actual key for storing the value
// - And we put the original client key inside encrypted value
//
// Finally, key value pair looks like this:
// hash(key) -> (key, value)
//
// Why use hash of keys instead of AES encryption? Because the output of hash function is much more compact,
// and we're still able to get the original key because we already encrypt value.
//
// The maximum length of a key is 65535
type Service interface {
Get(ctx context.Context, key string) ([]Value, error)
Set(ctx context.Context, key string, value []byte) error
SubscribeForKey(key string, subscriptionName string, observerFunc ObserverFunc) error
UnsubscribeFromKey(key string, subscriptionName string) error
}
type service struct {
lock sync.RWMutex
subscriptions map[string]map[string]subscription
keyValueStore keyvaluestorage.Storage
spaceCore commonspace.Space
observer keyvalueobserver.Observer
keysLock sync.Mutex
spaceSalt []byte
keyToDerivedKey map[string]derivedKey
}
func New(spaceCore commonspace.Space, observer keyvalueobserver.Observer) (Service, error) {
s := &service{
spaceCore: spaceCore,
observer: observer,
keyValueStore: spaceCore.KeyValue().DefaultStore(),
subscriptions: make(map[string]map[string]subscription),
keyToDerivedKey: make(map[string]derivedKey),
}
s.observer.SetObserver(s.observeChanges)
return s, nil
}
func (s *service) initSpaceSalt() ([]byte, error) {
records := s.spaceCore.Acl().Records()
if len(records) == 0 {
return nil, fmt.Errorf("empty acl")
}
first := records[0]
readKeyId, err := s.spaceCore.Acl().AclState().ReadKeyForAclId(first.Id)
if err != nil {
return nil, fmt.Errorf("find read key id: %w", err)
}
readKeys := s.spaceCore.Acl().AclState().Keys()
key, ok := readKeys[readKeyId]
if !ok {
return nil, fmt.Errorf("read key not found")
}
rawReadKey, err := key.ReadKey.Raw()
if err != nil {
return nil, fmt.Errorf("get raw bytes: %w", err)
}
return rawReadKey, nil
}
func (s *service) getSalt() ([]byte, error) {
if s.spaceSalt == nil {
salt, err := s.initSpaceSalt()
if err != nil {
return nil, err
}
s.spaceSalt = salt
return s.spaceSalt, nil
}
return s.spaceSalt, nil
}
func (s *service) observeChanges(decryptor keyvaluestorage.Decryptor, kvs []innerstorage.KeyValue) {
for _, kv := range kvs {
value, err := decodeKeyValue(decryptor, kv)
if err != nil {
log.Warn("decode key-value", zap.Error(err))
continue
}
s.lock.RLock()
byKey := s.subscriptions[value.Key]
for _, sub := range byKey {
sub.observerFunc(value.Key, value)
}
s.lock.RUnlock()
}
}
func decodeKeyValue(decryptor keyvaluestorage.Decryptor, kv innerstorage.KeyValue) (Value, error) {
data, err := decryptor(kv)
if err != nil {
return Value{}, fmt.Errorf("decrypt value: %w", err)
}
clientKey, value, err := decodeKeyValuePair(data)
if err != nil {
return Value{}, fmt.Errorf("decode key-value pair: %w", err)
}
return Value{
Key: clientKey,
Data: value,
TimestampMilli: kv.TimestampMilli,
}, nil
}
func (s *service) Get(ctx context.Context, key string) ([]Value, error) {
derived, err := s.getDerivedKey(key)
if err != nil {
return nil, fmt.Errorf("getDerivedKey: %w", err)
}
var result []Value
err = s.keyValueStore.GetAll(ctx, string(derived), func(decryptor keyvaluestorage.Decryptor, kvs []innerstorage.KeyValue) error {
result = make([]Value, 0, len(kvs))
for _, kv := range kvs {
value, err := decodeKeyValue(decryptor, kv)
if err != nil {
return fmt.Errorf("decode key-value pair: %w", err)
}
result = append(result, value)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("get all: %w", err)
}
return result, nil
}
func (s *service) Set(ctx context.Context, key string, value []byte) error {
derived, err := s.getDerivedKey(key)
if err != nil {
return fmt.Errorf("getDerivedKey: %w", err)
}
// Encode value as key + value, so we can use hashing for keys and still able to retrieve original key from client code
encoded, err := encodeKeyValuePair(key, value)
if err != nil {
return fmt.Errorf("encode value: %w", err)
}
return s.keyValueStore.Set(ctx, string(derived), encoded)
}
func (s *service) getDerivedKey(key string) (derivedKey, error) {
s.keysLock.Lock()
defer s.keysLock.Unlock()
derived, ok := s.keyToDerivedKey[key]
if ok {
return derived, nil
}
salt, err := s.getSalt()
if err != nil {
return derived, fmt.Errorf("get salt: %w", err)
}
hasher := sha256.New()
// Salt
hasher.Write(salt)
// User key
hasher.Write([]byte(key))
result := hasher.Sum(nil)
derived = derivedKey(hex.EncodeToString(result))
s.keyToDerivedKey[key] = derived
return derived, nil
}
func (s *service) SubscribeForKey(key string, subscriptionName string, observerFunc ObserverFunc) error {
s.lock.Lock()
defer s.lock.Unlock()
byKey, ok := s.subscriptions[key]
if !ok {
byKey = make(map[string]subscription)
s.subscriptions[key] = byKey
}
byKey[subscriptionName] = subscription{
name: subscriptionName,
observerFunc: observerFunc,
}
return nil
}
func (s *service) UnsubscribeFromKey(key string, subscriptionName string) error {
s.lock.Lock()
defer s.lock.Unlock()
byKey, ok := s.subscriptions[key]
if ok {
delete(byKey, subscriptionName)
}
return nil
}
// use 2 as we use uint16
const sizePrefixLen = 2
func encodeKeyValuePair(key string, value []byte) ([]byte, error) {
keySize := len(key)
if keySize > math.MaxUint16 {
return nil, fmt.Errorf("key is too long: %d", keySize)
}
buf := make([]byte, sizePrefixLen+len(key)+len(value))
binary.BigEndian.PutUint16(buf, uint16(keySize))
copy(buf[sizePrefixLen:], key)
copy(buf[sizePrefixLen+len(key):], value)
return buf, nil
}
func decodeKeyValuePair(raw []byte) (string, []byte, error) {
if len(raw) < sizePrefixLen {
return "", nil, fmt.Errorf("raw value is too small: no key size prefix")
}
keySize := int(binary.BigEndian.Uint16(raw))
if len(raw) < sizePrefixLen+keySize {
return "", nil, fmt.Errorf("raw value is too small: no key")
}
key := make([]byte, keySize)
copy(key, raw[sizePrefixLen:sizePrefixLen+keySize])
value := raw[sizePrefixLen+keySize:]
return string(key), value, nil
}

View file

@ -0,0 +1,61 @@
package keyvalueservice
import (
"bytes"
"fmt"
"math"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestEncodeKeyValuePair(t *testing.T) {
for i, tc := range []struct {
key string
value []byte
isEncodingErr bool
}{
{
key: "",
value: []byte(nil),
},
{
key: "",
value: []byte("value"),
},
{
key: "key",
value: []byte(nil),
},
{
key: "key",
value: []byte("value"),
},
{
key: string(make([]byte, math.MaxUint16)),
value: []byte("value"),
},
{
key: string(make([]byte, math.MaxUint16+1)),
value: []byte("value"),
isEncodingErr: true,
},
} {
t.Run(fmt.Sprintf("case %d", i+1), func(t *testing.T) {
encoded, err := encodeKeyValuePair(tc.key, tc.value)
if tc.isEncodingErr {
require.Error(t, err)
return
}
require.NoError(t, err)
decodedKey, decodedValue, err := decodeKeyValuePair(encoded)
require.NoError(t, err)
assert.True(t, tc.key == decodedKey)
assert.True(t, bytes.Equal(tc.value, decodedValue))
})
}
}

View file

@ -15,6 +15,8 @@ import (
headsync "github.com/anyproto/any-sync/commonspace/headsync"
keyvalueservice "github.com/anyproto/anytype-heart/space/clientspace/keyvalueservice"
mock "github.com/stretchr/testify/mock"
objectcache "github.com/anyproto/anytype-heart/core/block/object/objectcache"
@ -1431,6 +1433,53 @@ func (_c *MockSpace_IsReadOnly_Call) RunAndReturn(run func() bool) *MockSpace_Is
return _c
}
// KeyValueService provides a mock function with given fields:
func (_m *MockSpace) KeyValueService() keyvalueservice.Service {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for KeyValueService")
}
var r0 keyvalueservice.Service
if rf, ok := ret.Get(0).(func() keyvalueservice.Service); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(keyvalueservice.Service)
}
}
return r0
}
// MockSpace_KeyValueService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'KeyValueService'
type MockSpace_KeyValueService_Call struct {
*mock.Call
}
// KeyValueService is a helper method to define mock.On call
func (_e *MockSpace_Expecter) KeyValueService() *MockSpace_KeyValueService_Call {
return &MockSpace_KeyValueService_Call{Call: _e.mock.On("KeyValueService")}
}
func (_c *MockSpace_KeyValueService_Call) Run(run func()) *MockSpace_KeyValueService_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSpace_KeyValueService_Call) Return(_a0 keyvalueservice.Service) *MockSpace_KeyValueService_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSpace_KeyValueService_Call) RunAndReturn(run func() keyvalueservice.Service) *MockSpace_KeyValueService_Call {
_c.Call.Return(run)
return _c
}
// LoadObjects provides a mock function with given fields: ctx, ids
func (_m *MockSpace) LoadObjects(ctx context.Context, ids []string) error {
ret := _m.Called(ctx, ids)

View file

@ -24,8 +24,10 @@ import (
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
coresb "github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/pkg/lib/threads"
"github.com/anyproto/anytype-heart/space/clientspace/keyvalueservice"
"github.com/anyproto/anytype-heart/space/internal/objectprovider"
"github.com/anyproto/anytype-heart/space/spacecore"
"github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver"
"github.com/anyproto/anytype-heart/space/spacecore/peermanager"
"github.com/anyproto/anytype-heart/space/spacecore/storage"
"github.com/anyproto/anytype-heart/space/spacecore/storage/anystorage"
@ -56,6 +58,8 @@ type Space interface {
IsPersonal() bool
GetAclIdentity() crypto.PubKey
KeyValueService() keyvalueservice.Service
Close(ctx context.Context) error
}
@ -82,6 +86,7 @@ type space struct {
derivedIDs threads.DerivedSmartblockIds
installer bundledObjectsInstaller
spaceCore spacecore.SpaceCoreService
keyValueService keyvalueservice.Service
personalSpaceId string
aclIdentity crypto.PubKey
@ -102,6 +107,7 @@ type SpaceDeps struct {
AccountService accountservice.Service
StorageService storage.ClientStorage
SpaceCore spacecore.SpaceCoreService
KeyValueObserver keyvalueobserver.Observer
PersonalSpaceId string
LoadCtx context.Context
DisableRemoteLoad bool
@ -150,7 +156,14 @@ func BuildSpace(ctx context.Context, deps SpaceDeps) (Space, error) {
return nil, fmt.Errorf("install bundled objects: %w", err)
}
}
sp.keyValueService, err = keyvalueservice.New(sp.common, deps.KeyValueObserver)
if err != nil {
return nil, fmt.Errorf("create key value service: %w", err)
}
go sp.mandatoryObjectsLoad(deps.LoadCtx, deps.DisableRemoteLoad)
return sp, nil
}
@ -228,6 +241,10 @@ func (s *space) CommonSpace() commonspace.Space {
return s.common
}
func (s *space) KeyValueService() keyvalueservice.Service {
return s.keyValueService
}
func (s *space) WaitMandatoryObjects(ctx context.Context) (err error) {
select {
case <-s.loadMandatoryObjectsCh:

View file

@ -2,6 +2,7 @@ package clientspace
import (
"context"
"fmt"
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/commonspace"
@ -10,6 +11,8 @@ import (
"github.com/anyproto/anytype-heart/core/block/object/objectcache"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/space/clientspace/keyvalueservice"
"github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver"
"github.com/anyproto/anytype-heart/space/techspace"
)
@ -19,16 +22,17 @@ type TechSpace struct {
}
type TechSpaceDeps struct {
CommonSpace commonspace.Space
ObjectFactory objectcache.ObjectFactory
AccountService accountservice.Service
PersonalSpaceId string
Indexer spaceIndexer
Installer bundledObjectsInstaller
TechSpace techspace.TechSpace
CommonSpace commonspace.Space
ObjectFactory objectcache.ObjectFactory
AccountService accountservice.Service
PersonalSpaceId string
Indexer spaceIndexer
Installer bundledObjectsInstaller
TechSpace techspace.TechSpace
KeyValueObserver keyvalueobserver.Observer
}
func NewTechSpace(deps TechSpaceDeps) *TechSpace {
func NewTechSpace(deps TechSpaceDeps) (*TechSpace, error) {
sp := &TechSpace{
space: &space{
indexer: deps.Indexer,
@ -40,8 +44,13 @@ func NewTechSpace(deps TechSpaceDeps) *TechSpace {
},
TechSpace: deps.TechSpace,
}
var err error
sp.keyValueService, err = keyvalueservice.New(sp.common, deps.KeyValueObserver)
if err != nil {
return nil, fmt.Errorf("create key value service: %w", err)
}
sp.Cache = objectcache.New(deps.AccountService, deps.ObjectFactory, deps.PersonalSpaceId, sp)
return sp
return sp, nil
}
func (s *TechSpace) Close(ctx context.Context) error {
@ -65,3 +74,7 @@ func (s *TechSpace) GetTypeIdByKey(ctx context.Context, key domain.TypeKey) (id
}
return s.space.GetTypeIdByKey(ctx, key)
}
func (s *TechSpace) CommonSpace() commonspace.Space {
return s.space.CommonSpace()
}

View file

@ -9,6 +9,7 @@ import (
"github.com/anyproto/any-sync/commonspace/acl/aclclient"
"github.com/anyproto/any-sync/commonspace/headsync"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/kvinterfaces"
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/spacestorage"
@ -152,3 +153,7 @@ func (c *virtualCommonSpace) Close() error {
func (c *virtualCommonSpace) IsReadOnly() bool {
return false
}
func (c *virtualCommonSpace) KeyValue() kvinterfaces.KeyValueService {
return nil
}

View file

@ -18,7 +18,7 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/anyproto/anytype-heart/core/block/chats/push"
"github.com/anyproto/anytype-heart/core/block/chats/chatpush"
"github.com/anyproto/anytype-heart/core/event"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/space/clientspace"
@ -187,7 +187,7 @@ func (a *aclObjectManager) process() {
func (a *aclObjectManager) subscribeToPushNotifications(acl syncacl.SyncAcl) {
aclState := acl.AclState()
if !aclState.Permissions(aclState.AccountKey().GetPublic()).IsOwner() {
a.pushNotificationService.SubscribeToTopics(a.ctx, a.sp.Id(), []string{push.ChatsTopicName})
a.pushNotificationService.SubscribeToTopics(a.ctx, a.sp.Id(), []string{chatpush.ChatsTopicName})
}
}

View file

@ -93,15 +93,16 @@ func (b *spaceBuilder) BuildSpace(ctx context.Context, disableRemoteLoad bool) (
coreSpace.TreeSyncer().StopSync()
}
deps := clientspace.SpaceDeps{
Indexer: b.indexer,
Installer: b.installer,
CommonSpace: coreSpace,
ObjectFactory: b.objectFactory,
AccountService: b.accountService,
PersonalSpaceId: b.personalSpaceId,
StorageService: b.storageService,
SpaceCore: b.spaceCore,
LoadCtx: b.ctx,
Indexer: b.indexer,
Installer: b.installer,
CommonSpace: coreSpace,
ObjectFactory: b.objectFactory,
AccountService: b.accountService,
PersonalSpaceId: b.personalSpaceId,
StorageService: b.storageService,
SpaceCore: b.spaceCore,
LoadCtx: b.ctx,
KeyValueObserver: coreSpace.KeyValueObserver(),
}
space, err := clientspace.BuildSpace(ctx, deps)
if err != nil {

View file

@ -0,0 +1,97 @@
package keyvalueobserver
import (
"context"
"errors"
"sync"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage/innerstorage"
"github.com/cheggaaa/mb/v3"
)
const CName = keyvaluestorage.IndexerCName
type ObserverFunc func(decryptor keyvaluestorage.Decryptor, kvs []innerstorage.KeyValue)
type Observer interface {
keyvaluestorage.Indexer
SetObserver(observerFunc ObserverFunc)
}
func New() Observer {
return &observer{}
}
type queueItem struct {
decryptor keyvaluestorage.Decryptor
keyValues []innerstorage.KeyValue
}
type observer struct {
componentContext context.Context
componentContextCancel context.CancelFunc
lock sync.RWMutex
observerFunc ObserverFunc
updateQueue *mb.MB[queueItem]
}
func (o *observer) Init(a *app.App) (err error) {
ctx, cancel := context.WithCancel(context.Background())
o.componentContext = ctx
o.componentContextCancel = cancel
o.updateQueue = mb.New[queueItem](0)
return nil
}
func (o *observer) Run(ctx context.Context) error {
go func() {
for {
select {
case <-o.componentContext.Done():
return
default:
}
item, err := o.updateQueue.WaitOne(o.componentContext)
if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, mb.ErrClosed) {
return
}
o.lock.RLock()
obs := o.observerFunc
o.lock.RUnlock()
if obs != nil {
obs(item.decryptor, item.keyValues)
}
}
}()
return nil
}
func (o *observer) Close(ctx context.Context) (err error) {
if o.componentContextCancel != nil {
o.componentContextCancel()
}
return nil
}
func (o *observer) Name() (name string) {
return CName
}
func (o *observer) SetObserver(observerFunc ObserverFunc) {
o.lock.Lock()
defer o.lock.Unlock()
o.observerFunc = observerFunc
}
func (o *observer) Index(decryptor keyvaluestorage.Decryptor, keyValue ...innerstorage.KeyValue) error {
return o.updateQueue.Add(o.componentContext, queueItem{decryptor: decryptor, keyValues: keyValue})
}

View file

@ -131,3 +131,25 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR
func (r *rpcHandler) ObjectSyncStream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error {
return r.s.streamPool.ReadStream(stream, 300)
}
func (r *rpcHandler) StoreDiff(ctx context.Context, req *spacesyncproto.StoreDiffRequest) (*spacesyncproto.StoreDiffResponse, error) {
space, err := r.s.Get(ctx, req.SpaceId)
if err != nil {
return nil, fmt.Errorf("get space: %w", err)
}
return space.KeyValue().HandleStoreDiffRequest(ctx, req)
}
func (r *rpcHandler) StoreElements(stream spacesyncproto.DRPCSpaceSync_StoreElementsStream) error {
msg, err := stream.Recv()
if err != nil {
return fmt.Errorf("recv first message: %w", err)
}
ctx := context.Background()
space, err := r.s.Get(ctx, msg.SpaceId)
if err != nil {
return fmt.Errorf("get space: %w", err)
}
return space.KeyValue().HandleStoreElementsRequest(ctx, stream)
}

View file

@ -11,6 +11,8 @@ import (
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/app/ocache"
"github.com/anyproto/any-sync/commonspace"
"github.com/anyproto/any-sync/commonspace/spacepayloads"
// nolint: misspell
commonconfig "github.com/anyproto/any-sync/commonspace/config"
"github.com/anyproto/any-sync/commonspace/object/accountdata"
@ -31,6 +33,7 @@ import (
"github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus"
"github.com/anyproto/anytype-heart/core/wallet"
"github.com/anyproto/anytype-heart/space/spacecore/clientspaceproto"
"github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver"
"github.com/anyproto/anytype-heart/space/spacecore/localdiscovery"
"github.com/anyproto/anytype-heart/space/spacecore/peerstore"
"github.com/anyproto/anytype-heart/space/spacecore/storage"
@ -127,7 +130,7 @@ func (s *service) Run(ctx context.Context) (err error) {
}
func (s *service) Derive(ctx context.Context, spaceType string) (space *AnySpace, err error) {
payload := commonspace.SpaceDerivePayload{
payload := spacepayloads.SpaceDerivePayload{
SigningKey: s.wallet.GetAccountPrivkey(),
MasterKey: s.wallet.GetMasterKey(),
SpaceType: spaceType,
@ -149,7 +152,7 @@ func (s *service) CloseSpace(ctx context.Context, id string) error {
}
func (s *service) DeriveID(ctx context.Context, spaceType string) (id string, err error) {
payload := commonspace.SpaceDerivePayload{
payload := spacepayloads.SpaceDerivePayload{
SigningKey: s.wallet.GetAccountPrivkey(),
MasterKey: s.wallet.GetMasterKey(),
SpaceType: spaceType,
@ -162,7 +165,7 @@ func (s *service) Create(ctx context.Context, replicationKey uint64, metadataPay
if err != nil {
return nil, fmt.Errorf("generate metadata key: %w", err)
}
payload := commonspace.SpaceCreatePayload{
payload := spacepayloads.SpaceCreatePayload{
SigningKey: s.wallet.GetAccountPrivkey(),
MasterKey: s.wallet.GetMasterKey(),
ReadKey: crypto.NewAES(),
@ -228,10 +231,12 @@ func (s *service) Delete(ctx context.Context, spaceId string) (err error) {
}
func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object, err error) {
kvObserver := keyvalueobserver.New()
statusService := objectsyncstatus.NewSyncStatusService()
deps := commonspace.Deps{
TreeSyncer: treesyncer.NewTreeSyncer(id),
SyncStatus: statusService,
Indexer: kvObserver,
}
if res, ok := ctx.Value(OptsKey).(Opts); ok && res.SignKey != nil {
// TODO: [stream] replace with real peer id
@ -250,7 +255,7 @@ func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object
if err != nil {
return
}
ns, err := newAnySpace(cc)
ns, err := newAnySpace(cc, kvObserver)
if err != nil {
return
}

View file

@ -5,14 +5,22 @@ import (
"time"
"github.com/anyproto/any-sync/commonspace"
"github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver"
)
func newAnySpace(cc commonspace.Space) (*AnySpace, error) {
return &AnySpace{Space: cc}, nil
func newAnySpace(cc commonspace.Space, kvObserver keyvalueobserver.Observer) (*AnySpace, error) {
return &AnySpace{Space: cc, keyValueObserver: kvObserver}, nil
}
type AnySpace struct {
commonspace.Space
keyValueObserver keyvalueobserver.Observer
}
func (s *AnySpace) KeyValueObserver() keyvalueobserver.Observer {
return s.keyValueObserver
}
func (s *AnySpace) Init(ctx context.Context) (err error) {

View file

@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"strings"
"syscall"
anystore "github.com/anyproto/any-store"
"github.com/anyproto/any-sync/app"
@ -27,7 +28,7 @@ type NotEnoughFreeSpaceError struct {
Required uint64
}
func (e NotEnoughFreeSpaceError) Error() string {
func (e *NotEnoughFreeSpaceError) Error() string {
if e.Required == 0 {
return fmt.Sprintf("not enough free space: %d", e.Free)
}
@ -74,6 +75,29 @@ func (m *migrator) Name() (name string) {
return CName
}
func isDiskFull(err error) bool {
if err == nil {
return false
}
// From sqlite
if strings.Contains(err.Error(), "disk is full") {
return true
}
// For unix systems
if errors.Is(err, syscall.ENOSPC) {
return true
}
// For windows
var syscallErrno syscall.Errno
if errors.As(err, &syscallErrno) {
// See https://pkg.go.dev/golang.org/x/sys/windows
// ERROR_DISK_FULL syscall.Errno = 112
return syscallErrno == 112
}
return false
}
func (m *migrator) Run(ctx context.Context) (err error) {
oldSize, err := m.oldStorage.EstimateSize()
if err != nil {
@ -85,7 +109,7 @@ func (m *migrator) Run(ctx context.Context) (err error) {
}
requiredDiskSpace := oldSize * 15 / 10
if requiredDiskSpace > free {
return NotEnoughFreeSpaceError{
return &NotEnoughFreeSpaceError{
Free: free,
Required: requiredDiskSpace,
}
@ -93,9 +117,10 @@ func (m *migrator) Run(ctx context.Context) (err error) {
err = m.run(ctx)
if err != nil {
if strings.Contains(err.Error(), "disk is full") {
return NotEnoughFreeSpaceError{
Free: free,
if isDiskFull(err) {
return &NotEnoughFreeSpaceError{
Free: free,
Required: requiredDiskSpace,
}
}
return err

View file

@ -2,9 +2,11 @@ package migrator
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"syscall"
"testing"
"github.com/anyproto/any-sync/app"
@ -201,3 +203,19 @@ func copyDir(srcPath string, destPath string) error {
}
return nil
}
func TestIsDiskFull(t *testing.T) {
for _, tc := range []struct {
inputErr error
expected bool
}{
{nil, false},
{syscall.ENOSPC, true},
{os.ErrInvalid, false},
{syscall.Errno(112), true},
{syscall.Errno(111), false},
{fmt.Errorf("disk is full"), true},
} {
assert.Equal(t, tc.expected, isDiskFull(tc.inputErr))
}
}

View file

@ -115,16 +115,22 @@ func (s *spaceFactory) CreateAndSetTechSpace(ctx context.Context) (*clientspace.
if err != nil {
return nil, fmt.Errorf("derive tech space: %w", err)
}
kvObserver := techCoreSpace.KeyValueObserver()
deps := clientspace.TechSpaceDeps{
CommonSpace: techCoreSpace,
ObjectFactory: s.objectFactory,
AccountService: s.accountService,
PersonalSpaceId: s.personalSpaceId,
Indexer: s.indexer,
Installer: s.installer,
TechSpace: techSpace,
CommonSpace: techCoreSpace,
ObjectFactory: s.objectFactory,
AccountService: s.accountService,
PersonalSpaceId: s.personalSpaceId,
Indexer: s.indexer,
Installer: s.installer,
TechSpace: techSpace,
KeyValueObserver: kvObserver,
}
ts := clientspace.NewTechSpace(deps)
ts, err := clientspace.NewTechSpace(deps)
if err != nil {
return nil, fmt.Errorf("build tech space: %w", err)
}
s.techSpace = ts
s.app = s.app.ChildApp()
s.app.Register(s.techSpace)
@ -146,16 +152,22 @@ func (s *spaceFactory) LoadAndSetTechSpace(ctx context.Context) (*clientspace.Te
if err != nil {
return nil, fmt.Errorf("derive tech space: %w", err)
}
kvObserver := techCoreSpace.KeyValueObserver()
deps := clientspace.TechSpaceDeps{
CommonSpace: techCoreSpace,
ObjectFactory: s.objectFactory,
AccountService: s.accountService,
PersonalSpaceId: s.personalSpaceId,
Indexer: s.indexer,
Installer: s.installer,
TechSpace: techSpace,
CommonSpace: techCoreSpace,
ObjectFactory: s.objectFactory,
AccountService: s.accountService,
PersonalSpaceId: s.personalSpaceId,
Indexer: s.indexer,
Installer: s.installer,
TechSpace: techSpace,
KeyValueObserver: kvObserver,
}
ts := clientspace.NewTechSpace(deps)
ts, err := clientspace.NewTechSpace(deps)
if err != nil {
return nil, fmt.Errorf("build tech space: %w", err)
}
s.techSpace = ts
s.app = s.app.ChildApp()
s.app.Register(s.techSpace)

View file

@ -10,6 +10,8 @@ import (
domain "github.com/anyproto/anytype-heart/core/domain"
keyvalueobserver "github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver"
mock "github.com/stretchr/testify/mock"
objectcache "github.com/anyproto/anytype-heart/core/block/object/objectcache"
@ -378,17 +380,17 @@ func (_c *MockTechSpace_Name_Call) RunAndReturn(run func() string) *MockTechSpac
return _c
}
// Run provides a mock function with given fields: techCoreSpace, objectCache, create
func (_m *MockTechSpace) Run(techCoreSpace commonspace.Space, objectCache objectcache.Cache, create bool) error {
ret := _m.Called(techCoreSpace, objectCache, create)
// Run provides a mock function with given fields: techCoreSpace, objectCache, kvObserver, create
func (_m *MockTechSpace) Run(techCoreSpace commonspace.Space, objectCache objectcache.Cache, kvObserver keyvalueobserver.Observer, create bool) error {
ret := _m.Called(techCoreSpace, objectCache, kvObserver, create)
if len(ret) == 0 {
panic("no return value specified for Run")
}
var r0 error
if rf, ok := ret.Get(0).(func(commonspace.Space, objectcache.Cache, bool) error); ok {
r0 = rf(techCoreSpace, objectCache, create)
if rf, ok := ret.Get(0).(func(commonspace.Space, objectcache.Cache, keyvalueobserver.Observer, bool) error); ok {
r0 = rf(techCoreSpace, objectCache, kvObserver, create)
} else {
r0 = ret.Error(0)
}
@ -404,14 +406,15 @@ type MockTechSpace_Run_Call struct {
// Run is a helper method to define mock.On call
// - techCoreSpace commonspace.Space
// - objectCache objectcache.Cache
// - kvObserver keyvalueobserver.Observer
// - create bool
func (_e *MockTechSpace_Expecter) Run(techCoreSpace interface{}, objectCache interface{}, create interface{}) *MockTechSpace_Run_Call {
return &MockTechSpace_Run_Call{Call: _e.mock.On("Run", techCoreSpace, objectCache, create)}
func (_e *MockTechSpace_Expecter) Run(techCoreSpace interface{}, objectCache interface{}, kvObserver interface{}, create interface{}) *MockTechSpace_Run_Call {
return &MockTechSpace_Run_Call{Call: _e.mock.On("Run", techCoreSpace, objectCache, kvObserver, create)}
}
func (_c *MockTechSpace_Run_Call) Run(run func(techCoreSpace commonspace.Space, objectCache objectcache.Cache, create bool)) *MockTechSpace_Run_Call {
func (_c *MockTechSpace_Run_Call) Run(run func(techCoreSpace commonspace.Space, objectCache objectcache.Cache, kvObserver keyvalueobserver.Observer, create bool)) *MockTechSpace_Run_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(commonspace.Space), args[1].(objectcache.Cache), args[2].(bool))
run(args[0].(commonspace.Space), args[1].(objectcache.Cache), args[2].(keyvalueobserver.Observer), args[3].(bool))
})
return _c
}
@ -421,7 +424,7 @@ func (_c *MockTechSpace_Run_Call) Return(err error) *MockTechSpace_Run_Call {
return _c
}
func (_c *MockTechSpace_Run_Call) RunAndReturn(run func(commonspace.Space, objectcache.Cache, bool) error) *MockTechSpace_Run_Call {
func (_c *MockTechSpace_Run_Call) RunAndReturn(run func(commonspace.Space, objectcache.Cache, keyvalueobserver.Observer, bool) error) *MockTechSpace_Run_Call {
_c.Call.Return(run)
return _c
}

View file

@ -10,6 +10,7 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/net/peer"
"go.uber.org/zap"
@ -387,6 +388,10 @@ func (s *techSpace) getViewIdLocked(ctx context.Context, spaceId string) (viewId
return
}
func (s *techSpace) KeyValueStore() keyvaluestorage.Storage {
return s.techCore.KeyValue().DefaultStore()
}
func (s *techSpace) Close(ctx context.Context) (err error) {
s.ctxCancel()
s.mu.Lock()

View file

@ -358,7 +358,7 @@ func newFixture(t *testing.T, storeIDs []string) *fixture {
return nil, nil
}).Times(1)
require.NoError(t, fx.a.Start(ctx))
err := fx.TechSpace.Run(fx.techCore, fx.objectCache, false)
err := fx.TechSpace.Run(fx.techCore, fx.objectCache, nil, false)
require.NoError(t, err)
// do not cancel wakeUpIds func

View file

@ -48,21 +48,14 @@ const (
CName = "builtinobjects"
injectionTimeout = 30 * time.Second
migrationUseCase = -1
migrationDashboardName = "bafyreiha2hjbrzmwo7rpiiechv45vv37d6g5aezyr5wihj3agwawu6zi3u"
defaultDashboardId = "lastOpened"
migrationUseCase = -1
defaultDashboardId = "lastOpened"
contentLengthHeader = "Content-Length"
archiveDownloadingPercents = 30
archiveCopyingPercents = 10
)
type widgetParameters struct {
layout model.BlockContentWidgetLayout
objectID, viewID string
isObjectIDChanged bool
}
//go:embed data/start_guide.zip
var startGuideZip []byte
@ -75,13 +68,21 @@ var migrationDashboardZip []byte
//go:embed data/empty.zip
var emptyZip []byte
//go:embed data/get_started_mobile.zip
var getStartedMobileZip []byte
//go:embed data/empty_mobile.zip
var emptyMobileZip []byte
var (
log = logging.Logger("anytype-mw-builtinobjects")
archives = map[pb.RpcObjectImportUseCaseRequestUseCase][]byte{
pb.RpcObjectImportUseCaseRequest_GET_STARTED: getStartedZip,
pb.RpcObjectImportUseCaseRequest_GUIDE_ONLY: startGuideZip,
pb.RpcObjectImportUseCaseRequest_EMPTY: emptyZip,
pb.RpcObjectImportUseCaseRequest_GET_STARTED: getStartedZip,
pb.RpcObjectImportUseCaseRequest_GUIDE_ONLY: startGuideZip,
pb.RpcObjectImportUseCaseRequest_EMPTY: emptyZip,
pb.RpcObjectImportUseCaseRequest_GET_STARTED_MOBILE: getStartedMobileZip,
pb.RpcObjectImportUseCaseRequest_EMPTY_MOBILE: emptyMobileZip,
}
)

Binary file not shown.

Binary file not shown.

Binary file not shown.