1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-08 05:47:07 +09:00

GO-4678 Exclude space resolver from backlinks watcher

This commit is contained in:
kirillston 2025-05-21 18:32:56 +02:00
parent f69f86df96
commit 5b486bd59f
No known key found for this signature in database
GPG key ID: BE4BF014F0ECDFE8
6 changed files with 170 additions and 71 deletions

View file

@ -16,7 +16,6 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/object/idresolver"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pb"
@ -33,7 +32,6 @@ const (
CName = "backlinks-update-watcher"
defaultAggregationInterval = time.Second * 5
maxRetries = 10
)
var log = logger.NewNamed(CName)
@ -45,7 +43,6 @@ type backlinksUpdater interface {
type backLinksUpdate struct {
added []string
removed []string
retries uint8
}
type UpdateWatcher interface {
@ -57,12 +54,11 @@ type UpdateWatcher interface {
type watcher struct {
updater backlinksUpdater
store objectstore.ObjectStore
resolver idresolver.Resolver
spaceService space.Service
infoBatch *mb.MB[spaceindex.LinksUpdateInfo]
lock sync.Mutex
accumulatedBacklinks map[string]*backLinksUpdate
accumulatedBacklinks map[domain.FullID]*backLinksUpdate
aggregationInterval time.Duration
cancelCtx context.CancelFunc
ctx context.Context
@ -79,10 +75,10 @@ func (w *watcher) Name() string {
func (w *watcher) Init(a *app.App) error {
w.updater = app.MustComponent[backlinksUpdater](a)
w.store = app.MustComponent[objectstore.ObjectStore](a)
w.resolver = app.MustComponent[idresolver.Resolver](a)
w.spaceService = app.MustComponent[space.Service](a)
w.infoBatch = mb.New[spaceindex.LinksUpdateInfo](0)
w.accumulatedBacklinks = make(map[string]*backLinksUpdate)
w.accumulatedBacklinks = make(map[domain.FullID]*backLinksUpdate)
w.aggregationInterval = defaultAggregationInterval
return nil
}
@ -101,7 +97,7 @@ func (w *watcher) Run(ctx context.Context) error {
w.ctx, w.cancelCtx = context.WithCancel(context.Background())
w.updater.SubscribeLinksUpdate(func(info spaceindex.LinksUpdateInfo) {
if err := w.infoBatch.Add(w.ctx, info); err != nil {
log.Error("failed to add backlinks update info to message batch", zap.String("objectId", info.LinksFromId), zap.Error(err))
log.Error("failed to add backlinks update info to message batch", zap.String("objectId", info.LinksFromId.ObjectID), zap.Error(err))
}
})
@ -116,32 +112,48 @@ func (w *watcher) FlushUpdates() {
w.updateAccumulatedBacklinks()
}
func applyUpdates(m map[string]*backLinksUpdate, update spaceindex.LinksUpdateInfo) {
if update.LinksFromId == "" {
func applyUpdate(m map[domain.FullID]*backLinksUpdate, update spaceindex.LinksUpdateInfo, parseId func(string) (domain.FullID, error)) {
if update.LinksFromId.ObjectID == "" {
return
}
for _, removed := range update.Removed {
if _, ok := m[removed]; !ok {
m[removed] = &backLinksUpdate{}
fullId, err := parseId(removed)
if err != nil {
log.Error("failed to parse id", zap.String("objectId", removed), zap.Error(err))
continue
}
if i := lo.IndexOf(m[removed].added, update.LinksFromId); i >= 0 {
m[removed].added = append(m[removed].added[:i], m[removed].added[i+1:]...)
if fullId.SpaceID == "" {
fullId.SpaceID = update.LinksFromId.SpaceID
}
if !lo.Contains(m[removed].removed, update.LinksFromId) {
m[removed].removed = append(m[removed].removed, update.LinksFromId)
if _, ok := m[fullId]; !ok {
m[fullId] = &backLinksUpdate{}
}
if i := lo.IndexOf(m[fullId].added, update.LinksFromId.ObjectID); i >= 0 {
m[fullId].added = append(m[fullId].added[:i], m[fullId].added[i+1:]...)
}
if !lo.Contains(m[fullId].removed, update.LinksFromId.ObjectID) {
m[fullId].removed = append(m[fullId].removed, update.LinksFromId.ObjectID)
}
}
for _, added := range update.Added {
if _, ok := m[added]; !ok {
m[added] = &backLinksUpdate{}
fullId, err := parseId(added)
if err != nil {
log.Error("failed to parse id", zap.String("objectId", added), zap.Error(err))
continue
}
if i := lo.IndexOf(m[added].removed, update.LinksFromId); i >= 0 {
m[added].removed = append(m[added].removed[:i], m[added].removed[i+1:]...)
if fullId.SpaceID == "" {
fullId.SpaceID = update.LinksFromId.SpaceID
}
if !lo.Contains(m[added].added, update.LinksFromId) {
m[added].added = append(m[added].added, update.LinksFromId)
if _, ok := m[fullId]; !ok {
m[fullId] = &backLinksUpdate{}
}
if i := lo.IndexOf(m[fullId].removed, update.LinksFromId.ObjectID); i >= 0 {
m[fullId].removed = append(m[fullId].removed[:i], m[fullId].removed[i+1:]...)
}
if !lo.Contains(m[fullId].added, update.LinksFromId.ObjectID) {
m[fullId].added = append(m[fullId].added, update.LinksFromId.ObjectID)
}
}
}
@ -186,7 +198,7 @@ func (w *watcher) backlinksUpdateHandler() {
w.lock.Lock()
for _, info := range msgs {
info = cleanSelfLinks(info)
applyUpdates(w.accumulatedBacklinks, info)
applyUpdate(w.accumulatedBacklinks, info, domain.ParseLongId)
}
lastReceivedUpdates = time.Now()
w.lock.Unlock()
@ -195,21 +207,12 @@ func (w *watcher) backlinksUpdateHandler() {
func (w *watcher) updateAccumulatedBacklinks() {
log.Debug("updating backlinks", zap.Int64("objects number", int64(len(w.accumulatedBacklinks))))
updatesOnRetry := make(map[string]*backLinksUpdate)
for id, updates := range w.accumulatedBacklinks {
err := w.updateBackLinksInObject(id, updates)
if err == nil {
continue
if err := w.updateBackLinksInObject(id, updates); err != nil {
log.Error("failed to update backlinks", zap.String("objectId", id.ObjectID), zap.Error(err))
}
if updates.retries < maxRetries {
updates.retries++
updatesOnRetry[id] = updates
log.Debug("failed to update backlinks", zap.String("objectId", id), zap.Error(err))
continue
}
log.Error("failed to update backlinks", zap.String("objectId", id), zap.Error(err))
}
w.accumulatedBacklinks = updatesOnRetry
w.accumulatedBacklinks = make(map[domain.FullID]*backLinksUpdate)
}
func shouldIndexBacklinks(ids threads.DerivedSmartblockIds, id string) bool {
@ -224,15 +227,8 @@ func shouldIndexBacklinks(ids threads.DerivedSmartblockIds, id string) bool {
}
}
func (w *watcher) updateBackLinksInObject(id string, backlinksUpdate *backLinksUpdate) (err error) {
spaceId, err := w.resolver.ResolveSpaceID(id)
if err != nil {
if _, parseErr := dateutil.BuildDateObjectFromId(id); parseErr == nil {
return nil
}
return fmt.Errorf("failed to resolve space id: %w", err)
}
spc, err := w.spaceService.Get(w.ctx, spaceId)
func (w *watcher) updateBackLinksInObject(id domain.FullID, backlinksUpdate *backLinksUpdate) (err error) {
spc, err := w.spaceService.Get(w.ctx, id.SpaceID)
if err != nil {
return fmt.Errorf("failed to get space: %w", err)
}
@ -263,10 +259,10 @@ func (w *watcher) updateBackLinksInObject(id string, backlinksUpdate *backLinksU
return current, true, nil
}
if shouldIndexBacklinks(spaceDerivedIds, id) {
if shouldIndexBacklinks(spaceDerivedIds, id.ObjectID) {
// filter-out backlinks in system objects
err = spc.DoLockedIfNotExists(id, func() error {
return w.store.SpaceIndex(spaceId).ModifyObjectDetails(id, func(details *domain.Details) (*domain.Details, bool, error) {
err = spc.DoLockedIfNotExists(id.ObjectID, func() error {
return w.store.SpaceIndex(id.SpaceID).ModifyObjectDetails(id.ObjectID, func(details *domain.Details) (*domain.Details, bool, error) {
return updateBacklinks(details, backlinksUpdate)
})
})
@ -277,9 +273,9 @@ func (w *watcher) updateBackLinksInObject(id string, backlinksUpdate *backLinksU
}
if !errors.Is(err, ocache.ErrExists) {
log.Warn("failed to update backlinks for not cached object", zap.String("objectId", id), zap.Error(err))
log.Warn("failed to update backlinks for not cached object", zap.String("objectId", id.ObjectID), zap.Error(err))
}
if err = spc.Do(id, func(b smartblock.SmartBlock) error {
if err = spc.Do(id.ObjectID, func(b smartblock.SmartBlock) error {
if cr, ok := b.(source.ChangeReceiver); ok {
return cr.StateAppend(func(d state.Doc) (s *state.State, changes []*pb.ChangeContent, err error) {
return d.NewState(), nil, nil
@ -295,12 +291,12 @@ func (w *watcher) updateBackLinksInObject(id string, backlinksUpdate *backLinksU
func hasSelfLinks(info spaceindex.LinksUpdateInfo) bool {
for _, link := range info.Added {
if link == info.LinksFromId {
if link == info.LinksFromId.ObjectID {
return true
}
}
for _, link := range info.Removed {
if link == info.LinksFromId {
if link == info.LinksFromId.ObjectID {
return true
}
}
@ -318,12 +314,12 @@ func cleanSelfLinks(info spaceindex.LinksUpdateInfo) spaceindex.LinksUpdateInfo
Removed: make([]string, 0, len(info.Removed)),
}
for _, link := range info.Added {
if link != info.LinksFromId {
if link != info.LinksFromId.ObjectID {
infoFilter.Added = append(infoFilter.Added, link)
}
}
for _, link := range info.Removed {
if link != info.LinksFromId {
if link != info.LinksFromId.ObjectID {
infoFilter.Removed = append(infoFilter.Removed, link)
}
}

View file

@ -1,6 +1,7 @@
package backlinks
import (
"strings"
"testing"
"time"
@ -10,7 +11,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/anyproto/anytype-heart/core/block/object/idresolver/mock_idresolver"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
@ -37,7 +37,6 @@ func (u *testUpdater) start() {
type fixture struct {
store *objectstore.StoreFixture
resolver *mock_idresolver.MockResolver
spaceService *mock_space.MockService
updater *testUpdater
*watcher
@ -46,23 +45,20 @@ type fixture struct {
func newFixture(t *testing.T, aggregationInterval time.Duration) *fixture {
updater := &testUpdater{}
store := objectstore.NewStoreFixture(t)
resolver := mock_idresolver.NewMockResolver(t)
spaceSvc := mock_space.NewMockService(t)
w := &watcher{
updater: updater,
store: store,
resolver: resolver,
spaceService: spaceSvc,
aggregationInterval: aggregationInterval,
infoBatch: mb.New[spaceindex.LinksUpdateInfo](0),
accumulatedBacklinks: make(map[string]*backLinksUpdate),
accumulatedBacklinks: make(map[domain.FullID]*backLinksUpdate),
}
return &fixture{
store: store,
resolver: resolver,
spaceService: spaceSvc,
updater: updater,
watcher: w,
@ -73,24 +69,24 @@ func TestWatcher_Run(t *testing.T) {
t.Run("backlinks update asynchronously", func(t *testing.T) {
// given
interval := 500 * time.Millisecond
fromId := domain.FullID{ObjectID: "obj1", SpaceID: spaceId}
f := newFixture(t, interval)
f.resolver.EXPECT().ResolveSpaceID(mock.Anything).Return(spaceId, nil)
f.updater.runFunc = func(callback func(info spaceindex.LinksUpdateInfo)) {
callback(spaceindex.LinksUpdateInfo{
LinksFromId: "obj1",
LinksFromId: fromId,
Added: []string{"obj2", "obj3"},
Removed: nil,
})
time.Sleep(interval / 2)
callback(spaceindex.LinksUpdateInfo{
LinksFromId: "obj1",
LinksFromId: fromId,
Added: []string{"obj4", "obj5"},
Removed: []string{"obj2"},
})
time.Sleep(interval / 2)
callback(spaceindex.LinksUpdateInfo{
LinksFromId: "obj1",
LinksFromId: fromId,
Added: []string{"obj6"},
Removed: []string{"obj5"},
})
@ -129,7 +125,6 @@ func TestWatcher_updateAccumulatedBacklinks(t *testing.T) {
t.Run("no errors", func(t *testing.T) {
// given
f := newFixture(t, time.Second)
f.resolver.EXPECT().ResolveSpaceID(mock.Anything).Return(spaceId, nil)
f.store.AddObjects(t, spaceId, []spaceindex.TestObject{{
bundle.RelationKeyId: domain.String("obj1"),
@ -155,15 +150,15 @@ func TestWatcher_updateAccumulatedBacklinks(t *testing.T) {
spc.EXPECT().Do(mock.Anything, mock.Anything).Return(nil).Once()
f.watcher.accumulatedBacklinks = map[string]*backLinksUpdate{
"obj1": {
f.watcher.accumulatedBacklinks = map[domain.FullID]*backLinksUpdate{
domain.FullID{ObjectID: "obj1", SpaceID: spaceId}: {
added: []string{"obj2", "obj3"},
removed: []string{"obj4", "obj5"},
},
"obj2": {
domain.FullID{ObjectID: "obj2", SpaceID: spaceId}: {
added: []string{"obj4", "obj5"},
},
"obj3": {
domain.FullID{ObjectID: "obj3", SpaceID: spaceId}: {
removed: []string{"obj1", "obj4"},
},
}
@ -180,3 +175,98 @@ func TestWatcher_updateAccumulatedBacklinks(t *testing.T) {
assert.Equal(t, []string{"obj2"}, details.GetStringList(bundle.RelationKeyBacklinks))
})
}
func TestApplyUpdate(t *testing.T) {
t.Run("empty map", func(t *testing.T) {
// given
m := make(map[domain.FullID]*backLinksUpdate)
// when
applyUpdate(m, spaceindex.LinksUpdateInfo{
LinksFromId: domain.FullID{ObjectID: "obj1", SpaceID: spaceId},
Added: []string{"obj2", "spc2/obj3"},
Removed: []string{"spc1/obj4", "spc3/obj5"},
}, parseId)
// then
require.Len(t, m, 4)
update, ok := m[domain.FullID{ObjectID: "obj2", SpaceID: spaceId}]
require.True(t, ok)
assert.Equal(t, []string{"obj1"}, update.added)
assert.Empty(t, update.removed)
update, ok = m[domain.FullID{ObjectID: "obj3", SpaceID: "spc2"}]
require.True(t, ok)
assert.Equal(t, []string{"obj1"}, update.added)
assert.Empty(t, update.removed)
update, ok = m[domain.FullID{ObjectID: "obj4", SpaceID: "spc1"}]
require.True(t, ok)
assert.Equal(t, []string{"obj1"}, update.removed)
assert.Empty(t, update.added)
update, ok = m[domain.FullID{ObjectID: "obj5", SpaceID: "spc3"}]
require.True(t, ok)
assert.Equal(t, []string{"obj1"}, update.removed)
assert.Empty(t, update.added)
})
t.Run("prefilled map", func(t *testing.T) {
// given
m := map[domain.FullID]*backLinksUpdate{
domain.FullID{ObjectID: "obj2", SpaceID: spaceId}: {
added: []string{"obj3"},
},
domain.FullID{ObjectID: "obj3", SpaceID: "spc2"}: {
removed: []string{"obj1"},
},
domain.FullID{ObjectID: "obj5", SpaceID: "spc3"}: {
added: []string{"obj1", "obj2"},
removed: []string{"obj3"},
},
}
// when
applyUpdate(m, spaceindex.LinksUpdateInfo{
LinksFromId: domain.FullID{ObjectID: "obj1", SpaceID: spaceId},
Added: []string{"obj2", "spc2/obj3"},
Removed: []string{"spc1/obj4", "spc3/obj5"},
}, parseId)
// then
require.Len(t, m, 4)
update, ok := m[domain.FullID{ObjectID: "obj2", SpaceID: spaceId}]
require.True(t, ok)
assert.Equal(t, []string{"obj3", "obj1"}, update.added)
assert.Empty(t, update.removed)
update, ok = m[domain.FullID{ObjectID: "obj3", SpaceID: "spc2"}]
require.True(t, ok)
assert.Equal(t, []string{"obj1"}, update.added)
assert.Empty(t, update.removed)
update, ok = m[domain.FullID{ObjectID: "obj4", SpaceID: "spc1"}]
require.True(t, ok)
assert.Equal(t, []string{"obj1"}, update.removed)
assert.Empty(t, update.added)
update, ok = m[domain.FullID{ObjectID: "obj5", SpaceID: "spc3"}]
require.True(t, ok)
assert.Equal(t, []string{"obj3", "obj1"}, update.removed)
assert.Equal(t, []string{"obj2"}, update.added)
})
}
func parseId(id string) (domain.FullID, error) {
parts := strings.Split(id, "/")
switch len(parts) {
case 0:
return domain.FullID{}, domain.ErrParseLongId
case 1:
return domain.FullID{ObjectID: parts[0]}, nil
default:
return domain.FullID{ObjectID: parts[1], SpaceID: parts[0]}, nil
}
}

View file

@ -1,6 +1,7 @@
package domain
import (
"errors"
"fmt"
"strings"
)
@ -16,6 +17,8 @@ func (i FullID) IsEmpty() bool {
const ParticipantPrefix = "_participant_"
var ErrParseLongId = errors.New("failed to parse object id")
func NewParticipantId(spaceId, identity string) string {
// Replace dots with underscores to avoid issues on Desktop client
spaceId = strings.Replace(spaceId, ".", "_", 1)
@ -33,3 +36,11 @@ func ParseParticipantId(participantId string) (spaceId string, identity string,
return fmt.Sprintf("%s.%s", parts[2], parts[3]), parts[4], nil
}
func ParseLongId(id string) (FullID, error) {
if id == "" {
return FullID{}, ErrParseLongId
}
// TODO: support spaceId in long ids
return FullID{ObjectID: id}, nil
}

View file

@ -175,7 +175,7 @@ func (s *dsObjectStore) Init() error {
}
type LinksUpdateInfo struct {
LinksFromId string
LinksFromId domain.FullID
Added, Removed []string
}

View file

@ -2,6 +2,8 @@ package spaceindex
import (
"sync"
"github.com/anyproto/anytype-heart/core/domain"
)
type SubscriptionManager struct {
@ -15,7 +17,7 @@ func (s *SubscriptionManager) SubscribeLinksUpdate(callback func(info LinksUpdat
s.lock.Unlock()
}
func (s *SubscriptionManager) updateObjectLinks(fromId string, added []string, removed []string) {
func (s *SubscriptionManager) updateObjectLinks(fromId domain.FullID, added []string, removed []string) {
s.lock.RLock()
defer s.lock.RUnlock()
if s.onLinksUpdateCallback != nil && len(added)+len(removed) > 0 {

View file

@ -122,7 +122,7 @@ func (s *dsObjectStore) UpdateObjectLinks(ctx context.Context, id string, links
return err
}
s.subManager.updateObjectLinks(id, added, removed)
s.subManager.updateObjectLinks(domain.FullID{SpaceID: s.SpaceId(), ObjectID: id}, added, removed)
return nil
}