1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-11 02:13:41 +09:00

Merge pull request #1716 from anyproto/go-4316-fix-dependency-subscription-for-cross-space-links-in-opened

GO-4316: Fix dependency subscription for cross space links
This commit is contained in:
Roman Khafizianov 2024-10-21 20:19:36 +02:00 committed by GitHub
commit 580bb517b8
Signed by: github
GPG key ID: B5690EEEBB952194
6 changed files with 299 additions and 30 deletions

View file

@ -16,6 +16,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/lastused"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/migration"
"github.com/anyproto/anytype-heart/core/block/object/idresolver"
"github.com/anyproto/anytype-heart/core/block/process"
"github.com/anyproto/anytype-heart/core/block/restriction"
"github.com/anyproto/anytype-heart/core/block/source"
@ -72,6 +73,7 @@ type ObjectFactory struct {
objectDeleter ObjectDeleter
deviceService deviceService
lastUsedUpdater lastused.ObjectUsageUpdater
spaceIdResolver idresolver.Resolver
}
func NewObjectFactory() *ObjectFactory {
@ -104,6 +106,7 @@ func (f *ObjectFactory) Init(a *app.App) (err error) {
f.fileReconciler = app.MustComponent[reconciler.Reconciler](a)
f.deviceService = app.MustComponent[deviceService](a)
f.lastUsedUpdater = app.MustComponent[lastused.ObjectUsageUpdater](a)
f.spaceIdResolver = app.MustComponent[idresolver.Resolver](a)
return nil
}
@ -160,8 +163,10 @@ func (f *ObjectFactory) produceSmartblock(space smartblock.Space) (smartblock.Sm
f.fileStore,
f.restrictionService,
store,
f.objectStore,
f.indexer,
f.eventSender,
f.spaceIdResolver,
), store
}

View file

@ -0,0 +1,184 @@
package smartblock
import (
"testing"
"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
bb "github.com/anyproto/anytype-heart/tests/blockbuilder"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
func TestDependenciesSubscription(t *testing.T) {
t.Run("with existing dependencies", func(t *testing.T) {
mainObjId := "id"
fx := newFixture(mainObjId, t)
space1obj1 := "obj1"
space1obj2 := "obj2"
space2obj1 := "obj3"
fx.objectStore.AddObjects(t, testSpaceId, []objectstore.TestObject{
{
bundle.RelationKeyId: pbtypes.String(space1obj1),
bundle.RelationKeySpaceId: pbtypes.String(testSpaceId),
bundle.RelationKeyName: pbtypes.String("Object 1"),
},
{
bundle.RelationKeyId: pbtypes.String(space1obj2),
bundle.RelationKeySpaceId: pbtypes.String(testSpaceId),
bundle.RelationKeyName: pbtypes.String("Object 2"),
},
})
fx.objectStore.AddObjects(t, "space2", []objectstore.TestObject{
{
bundle.RelationKeyId: pbtypes.String(space2obj1),
bundle.RelationKeySpaceId: pbtypes.String("space2"),
bundle.RelationKeyName: pbtypes.String("Object 3"),
},
})
fx.spaceIdResolver.EXPECT().ResolveSpaceID(space1obj1).Return(testSpaceId, nil)
fx.spaceIdResolver.EXPECT().ResolveSpaceID(space1obj2).Return(testSpaceId, nil)
fx.spaceIdResolver.EXPECT().ResolveSpaceID(space2obj1).Return("space2", nil)
root := bb.Root(
bb.ID(mainObjId),
bb.Children(
bb.Link(space1obj1),
bb.Link(space1obj2),
bb.Link(space2obj1),
),
)
fx.Doc = state.NewDoc(mainObjId, root.BuildMap()).NewState()
objDetails := &types.Struct{
Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String(mainObjId),
bundle.RelationKeySpaceId.String(): pbtypes.String(testSpaceId),
bundle.RelationKeyName.String(): pbtypes.String("Main object"),
},
}
fx.Doc.(*state.State).SetDetails(objDetails)
details, err := fx.fetchMeta()
require.NoError(t, err)
require.NotEmpty(t, details)
wantDetails := []*model.ObjectViewDetailsSet{
{
Id: mainObjId,
Details: &types.Struct{
Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String(mainObjId),
bundle.RelationKeySpaceId.String(): pbtypes.String(testSpaceId),
bundle.RelationKeyName.String(): pbtypes.String("Main object"),
},
},
},
{
Id: space1obj1,
Details: &types.Struct{
Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String(space1obj1),
bundle.RelationKeySpaceId.String(): pbtypes.String(testSpaceId),
bundle.RelationKeyName.String(): pbtypes.String("Object 1"),
},
},
},
{
Id: space1obj2,
Details: &types.Struct{
Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String(space1obj2),
bundle.RelationKeySpaceId.String(): pbtypes.String(testSpaceId),
bundle.RelationKeyName.String(): pbtypes.String("Object 2"),
},
},
},
{
Id: space2obj1,
Details: &types.Struct{
Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String(space2obj1),
bundle.RelationKeySpaceId.String(): pbtypes.String("space2"),
bundle.RelationKeyName.String(): pbtypes.String("Object 3"),
},
},
},
}
assert.ElementsMatch(t, wantDetails, details)
fx.closeRecordsSub()
})
t.Run("with added dependencies", func(t *testing.T) {
mainObjId := "id"
fx := newFixture(mainObjId, t)
root := bb.Root(
bb.ID(mainObjId),
bb.Children(),
)
fx.Doc = state.NewDoc(mainObjId, root.BuildMap()).NewState()
details, err := fx.fetchMeta()
require.NoError(t, err)
require.Len(t, details, 1) // Only its own details
// Simulate changes in state
space1obj1 := "obj1"
space1obj2 := "obj2"
space2obj1 := "obj3"
fx.objectStore.AddObjects(t, testSpaceId, []objectstore.TestObject{
{
bundle.RelationKeyId: pbtypes.String(space1obj1),
bundle.RelationKeySpaceId: pbtypes.String(testSpaceId),
bundle.RelationKeyName: pbtypes.String("Object 1"),
},
{
bundle.RelationKeyId: pbtypes.String(space1obj2),
bundle.RelationKeySpaceId: pbtypes.String(testSpaceId),
bundle.RelationKeyName: pbtypes.String("Object 2"),
},
})
fx.objectStore.AddObjects(t, "space2", []objectstore.TestObject{
{
bundle.RelationKeyId: pbtypes.String(space2obj1),
bundle.RelationKeySpaceId: pbtypes.String("space2"),
bundle.RelationKeyName: pbtypes.String("Object 3"),
},
})
fx.spaceIdResolver.EXPECT().ResolveSpaceID(space1obj1).Return(testSpaceId, nil)
fx.spaceIdResolver.EXPECT().ResolveSpaceID(space1obj2).Return(testSpaceId, nil)
fx.spaceIdResolver.EXPECT().ResolveSpaceID(space2obj1).Return("space2", nil)
root = bb.Root(
bb.ID(mainObjId),
bb.Children(
bb.Link(space1obj1),
bb.Link(space1obj2),
bb.Link(space2obj1),
),
)
fx.Doc = state.NewDoc(mainObjId, root.BuildMap()).NewState()
fx.CheckSubscriptions()
assert.Contains(t, fx.smartBlock.lastDepDetails, space1obj1)
assert.Contains(t, fx.smartBlock.lastDepDetails, space1obj2)
assert.Contains(t, fx.smartBlock.lastDepDetails, space2obj1)
})
}

View file

@ -13,7 +13,7 @@ import (
)
func (sb *smartBlock) updateBackLinks(s *state.State) {
backLinks, err := sb.objectStore.GetInboundLinksById(sb.Id())
backLinks, err := sb.spaceIndex.GetInboundLinksById(sb.Id())
if err != nil {
log.With("objectID", sb.Id()).Errorf("failed to get inbound links from object store: %s", err)
return

View file

@ -20,6 +20,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/editor/template"
"github.com/anyproto/anytype-heart/core/block/object/idresolver"
"github.com/anyproto/anytype-heart/core/block/object/objectlink"
"github.com/anyproto/anytype-heart/core/block/restriction"
"github.com/anyproto/anytype-heart/core/block/simple"
@ -41,6 +42,7 @@ import (
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/pkg/lib/threads"
"github.com/anyproto/anytype-heart/space/spacecore/storage/sqlitestorage"
"github.com/anyproto/anytype-heart/util/anonymize"
"github.com/anyproto/anytype-heart/util/internalflag"
"github.com/anyproto/anytype-heart/util/pbtypes"
@ -96,9 +98,11 @@ func New(
currentParticipantId string,
fileStore filestore.FileStore,
restrictionService restriction.Service,
objectStore spaceindex.Store,
spaceIndex spaceindex.Store,
objectStore objectstore.ObjectStore,
indexer Indexer,
eventSender event.Sender,
spaceIdResolver idresolver.Resolver,
) SmartBlock {
s := &smartBlock{
currentParticipantId: currentParticipantId,
@ -110,9 +114,12 @@ func New(
fileStore: fileStore,
restrictionService: restrictionService,
objectStore: objectStore,
spaceIndex: spaceIndex,
indexer: indexer,
eventSender: eventSender,
objectStore: objectStore,
spaceIdResolver: spaceIdResolver,
lastDepDetails: map[string]*pb.EventObjectDetailsSet{},
}
return s
}
@ -245,9 +252,11 @@ type smartBlock struct {
// Deps
fileStore filestore.FileStore
restrictionService restriction.Service
objectStore spaceindex.Store
spaceIndex spaceindex.Store
objectStore objectstore.ObjectStore
indexer Indexer
eventSender event.Sender
spaceIdResolver idresolver.Resolver
}
func (sb *smartBlock) SetLocker(locker Locker) {
@ -319,7 +328,6 @@ func (sb *smartBlock) Init(ctx *InitContext) (err error) {
}
sb.undo = undo.NewHistory(0)
sb.restrictions = sb.restrictionService.GetRestrictions(sb)
sb.lastDepDetails = map[string]*pb.EventObjectDetailsSet{}
if ctx.State != nil {
// need to store file keys in case we have some new files in the state
sb.storeFileKeys(ctx.State)
@ -355,7 +363,7 @@ func (sb *smartBlock) Init(ctx *InitContext) (err error) {
}
ctx.State.AddBundledRelationLinks(relKeys...)
if ctx.IsNewObject && ctx.State != nil {
source.NewSubObjectsAndProfileLinksMigration(sb.Type(), sb.space, sb.currentParticipantId, sb.objectStore).Migrate(ctx.State)
source.NewSubObjectsAndProfileLinksMigration(sb.Type(), sb.space, sb.currentParticipantId, sb.spaceIndex).Migrate(ctx.State)
}
if err = sb.injectLocalDetails(ctx.State); err != nil {
@ -447,19 +455,42 @@ func (sb *smartBlock) fetchMeta() (details []*model.ObjectViewDetailsSet, err er
sb.closeRecordsSub()
sb.closeRecordsSub = nil
}
depIds := sb.dependentSmartIds(sb.includeRelationObjectsAsDependents, true, true)
sb.setDependentIDs(depIds)
perSpace, err := sb.partitionIdsBySpace(sb.depIds)
if err != nil {
return nil, fmt.Errorf("partiton by space: %w", err)
}
recordsCh := make(chan *types.Struct, 10)
sb.recordsSub = database.NewSubscription(nil, recordsCh)
depIDs := sb.dependentSmartIds(sb.includeRelationObjectsAsDependents, true, true)
sb.setDependentIDs(depIDs)
var records []database.Record
records, sb.closeRecordsSub, err = sb.objectStore.QueryByIdsAndSubscribeForChanges(sb.depIds, sb.recordsSub)
if err != nil {
// datastore unavailable, cancel the subscription
sb.recordsSub.Close()
sb.closeRecordsSub = nil
return
closers := make([]func(), 0, len(perSpace))
for spaceId, perSpaceDepIds := range perSpace {
spaceIndex := sb.objectStore.SpaceIndex(spaceId)
recs, closeRecordsSub, err := spaceIndex.QueryByIdsAndSubscribeForChanges(perSpaceDepIds, sb.recordsSub)
if err != nil {
for _, closer := range closers {
closer()
}
// datastore unavailable, cancel the subscription
sb.recordsSub.Close()
sb.closeRecordsSub = nil
return nil, fmt.Errorf("subscribe: %w", err)
}
closers = append(closers, closeRecordsSub)
records = append(records, recs...)
}
sb.closeRecordsSub = func() {
for _, closer := range closers {
closer()
}
}
details = make([]*model.ObjectViewDetailsSet, 0, len(records)+1)
@ -480,6 +511,22 @@ func (sb *smartBlock) fetchMeta() (details []*model.ObjectViewDetailsSet, err er
return
}
func (sb *smartBlock) partitionIdsBySpace(ids []string) (map[string][]string, error) {
perSpace := map[string][]string{}
for _, id := range ids {
spaceId, err := sb.spaceIdResolver.ResolveSpaceID(id)
if errors.Is(err, sqlitestorage.ErrObjectNotFound) {
perSpace[sb.space.Id()] = append(perSpace[sb.space.Id()], id)
continue
}
if err != nil {
return nil, fmt.Errorf("resolve space id: %w", err)
}
perSpace[spaceId] = append(perSpace[spaceId], id)
}
return perSpace, nil
}
func (sb *smartBlock) Lock() {
sb.Locker.Lock()
}
@ -787,7 +834,7 @@ func (sb *smartBlock) Apply(s *state.State, flags ...ApplyFlag) (err error) {
}
func (sb *smartBlock) ResetToVersion(s *state.State) (err error) {
source.NewSubObjectsAndProfileLinksMigration(sb.Type(), sb.space, sb.currentParticipantId, sb.objectStore).Migrate(s)
source.NewSubObjectsAndProfileLinksMigration(sb.Type(), sb.space, sb.currentParticipantId, sb.spaceIndex).Migrate(s)
s.SetParent(sb.Doc.(*state.State))
sb.storeFileKeys(s)
sb.injectLocalDetails(s)
@ -808,13 +855,23 @@ func (sb *smartBlock) CheckSubscriptions() (changed bool) {
return true
}
newIDs := sb.recordsSub.Subscribe(sb.depIds)
records, err := sb.objectStore.QueryByIds(newIDs)
perSpace, err := sb.partitionIdsBySpace(newIDs)
if err != nil {
log.Errorf("queryById error: %v", err)
log.Errorf("partiton by space error: %v", err)
}
for _, rec := range records {
sb.onMetaChange(rec.Details)
for spaceId, ids := range perSpace {
spaceIndex := sb.objectStore.SpaceIndex(spaceId)
records, err := spaceIndex.QueryByIds(ids)
if err != nil {
log.Errorf("queryById error: %v", err)
}
for _, rec := range records {
sb.onMetaChange(rec.Details)
}
}
return true
}
@ -862,7 +919,7 @@ func (sb *smartBlock) AddRelationLinksToState(s *state.State, relationKeys ...st
}
// todo: filter-out existing relation links?
// in the most cases it should save as an objectstore query
relations, err := sb.objectStore.FetchRelationByKeys(relationKeys...)
relations, err := sb.spaceIndex.FetchRelationByKeys(relationKeys...)
if err != nil {
return
}
@ -902,7 +959,7 @@ func (sb *smartBlock) injectLocalDetails(s *state.State) error {
}
func (sb *smartBlock) getDetailsFromStore() (*types.Struct, error) {
storedDetails, err := sb.objectStore.GetDetails(sb.Id())
storedDetails, err := sb.spaceIndex.GetDetails(sb.Id())
if err != nil || storedDetails == nil {
return nil, err
}
@ -911,7 +968,7 @@ func (sb *smartBlock) getDetailsFromStore() (*types.Struct, error) {
func (sb *smartBlock) appendPendingDetails(details *types.Struct) (resultDetails *types.Struct, hasPendingLocalDetails bool) {
// Consume pending details
err := sb.objectStore.UpdatePendingLocalDetails(sb.Id(), func(pending *types.Struct) (*types.Struct, error) {
err := sb.spaceIndex.UpdatePendingLocalDetails(sb.Id(), func(pending *types.Struct) (*types.Struct, error) {
if len(pending.GetFields()) > 0 {
hasPendingLocalDetails = true
}
@ -1238,7 +1295,7 @@ func (sb *smartBlock) Relations(s *state.State) relationutils.Relations {
} else {
links = s.GetRelationLinks()
}
rels, _ := sb.objectStore.FetchRelationByLinks(links)
rels, _ := sb.spaceIndex.FetchRelationByLinks(links)
return rels
}

View file

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/object/idresolver/mock_idresolver"
"github.com/anyproto/anytype-heart/core/block/restriction"
"github.com/anyproto/anytype-heart/core/block/restriction/mock_restriction"
"github.com/anyproto/anytype-heart/core/block/simple"
@ -25,6 +26,7 @@ import (
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore/spaceindex"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/util/internalflag"
@ -137,13 +139,14 @@ func TestSmartBlock_getDetailsFromStore(t *testing.T) {
},
}
fx.store.AddObjects(t, []spaceindex.TestObject{
{
err := fx.store.UpdateObjectDetails(context.Background(), id, &types.Struct{
Fields: map[string]*types.Value{
"id": pbtypes.String(id),
"number": pbtypes.Float64(2.18281828459045),
"🔥": pbtypes.StringList([]string{"Jeanne d'Arc", "Giordano Bruno", "Capocchio"}),
},
})
require.NoError(t, err)
// when
detailsFromStore, err := fx.getDetailsFromStore()
@ -455,17 +458,24 @@ func TestInjectDerivedDetails(t *testing.T) {
}
type fixture struct {
store *spaceindex.StoreFixture
objectStore *objectstore.StoreFixture
store spaceindex.Store
restrictionService *mock_restriction.MockService
indexer *MockIndexer
eventSender *mock_event.MockSender
source *sourceStub
spaceIdResolver *mock_idresolver.MockResolver
*smartBlock
}
const testSpaceId = "space1"
func newFixture(id string, t *testing.T) *fixture {
objectStore := spaceindex.NewStoreFixture(t)
objectStore := objectstore.NewStoreFixture(t)
spaceIndex := objectStore.SpaceIndex(testSpaceId)
spaceIdResolver := mock_idresolver.NewMockResolver(t)
indexer := NewMockIndexer(t)
@ -474,20 +484,23 @@ func newFixture(id string, t *testing.T) *fixture {
sender := mock_event.NewMockSender(t)
sb := New(nil, "", nil, restrictionService, objectStore, indexer, sender).(*smartBlock)
sb := New(nil, "", nil, restrictionService, spaceIndex, objectStore, indexer, sender, spaceIdResolver).(*smartBlock)
source := &sourceStub{
id: id,
spaceId: "space1",
sbType: smartblock.SmartBlockTypePage,
}
sb.source = source
return &fixture{
source: source,
smartBlock: sb,
store: objectStore,
store: spaceIndex,
restrictionService: restrictionService,
indexer: indexer,
eventSender: sender,
spaceIdResolver: spaceIdResolver,
objectStore: objectStore,
}
}

View file

@ -7,6 +7,7 @@ import (
"github.com/globalsign/mgo/bson"
"github.com/gogo/protobuf/types"
"github.com/anyproto/anytype-heart/core/block/simple"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
@ -53,6 +54,15 @@ func (b *Block) Build() []*model.Block {
}, descendants...)
}
func (b *Block) BuildMap() map[string]simple.Block {
blocks := b.Build()
res := make(map[string]simple.Block, len(blocks))
for _, bl := range blocks {
res[bl.Id] = simple.New(bl)
}
return res
}
func mkBlock(b *model.Block, opts ...Option) *Block {
o := options{
// Init children for easier equality check in tests