mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-07 21:37:04 +09:00
Merge pull request #2425 from anyproto/go-4678-retries-on-backlinks-update
GO-4678 Propagate spaceId to backlinks watcher
This commit is contained in:
commit
1c4695390b
6 changed files with 180 additions and 65 deletions
|
@ -3,24 +3,25 @@ package backlinks
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
"github.com/anyproto/any-sync/app/ocache"
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
|
||||
"github.com/anyproto/anytype-heart/core/block/editor/state"
|
||||
"github.com/anyproto/anytype-heart/core/block/object/idresolver"
|
||||
"github.com/anyproto/anytype-heart/core/block/source"
|
||||
"github.com/anyproto/anytype-heart/core/domain"
|
||||
"github.com/anyproto/anytype-heart/pb"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore/spaceindex"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/logging"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/threads"
|
||||
"github.com/anyproto/anytype-heart/space"
|
||||
"github.com/anyproto/anytype-heart/util/dateutil"
|
||||
|
@ -33,7 +34,7 @@ const (
|
|||
defaultAggregationInterval = time.Second * 5
|
||||
)
|
||||
|
||||
var log = logging.Logger(CName)
|
||||
var log = logger.NewNamed(CName)
|
||||
|
||||
type backlinksUpdater interface {
|
||||
SubscribeLinksUpdate(callback func(info spaceindex.LinksUpdateInfo))
|
||||
|
@ -53,12 +54,11 @@ type UpdateWatcher interface {
|
|||
type watcher struct {
|
||||
updater backlinksUpdater
|
||||
store objectstore.ObjectStore
|
||||
resolver idresolver.Resolver
|
||||
spaceService space.Service
|
||||
|
||||
infoBatch *mb.MB[spaceindex.LinksUpdateInfo]
|
||||
lock sync.Mutex
|
||||
accumulatedBacklinks map[string]*backLinksUpdate
|
||||
accumulatedBacklinks map[domain.FullID]*backLinksUpdate
|
||||
aggregationInterval time.Duration
|
||||
cancelCtx context.CancelFunc
|
||||
ctx context.Context
|
||||
|
@ -75,10 +75,10 @@ func (w *watcher) Name() string {
|
|||
func (w *watcher) Init(a *app.App) error {
|
||||
w.updater = app.MustComponent[backlinksUpdater](a)
|
||||
w.store = app.MustComponent[objectstore.ObjectStore](a)
|
||||
w.resolver = app.MustComponent[idresolver.Resolver](a)
|
||||
w.spaceService = app.MustComponent[space.Service](a)
|
||||
|
||||
w.infoBatch = mb.New[spaceindex.LinksUpdateInfo](0)
|
||||
w.accumulatedBacklinks = make(map[string]*backLinksUpdate)
|
||||
w.accumulatedBacklinks = make(map[domain.FullID]*backLinksUpdate)
|
||||
w.aggregationInterval = defaultAggregationInterval
|
||||
return nil
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ func (w *watcher) Close(context.Context) error {
|
|||
w.cancelCtx()
|
||||
}
|
||||
if err := w.infoBatch.Close(); err != nil {
|
||||
log.Errorf("failed to close message batch: %v", err)
|
||||
log.Error("failed to close message batch", zap.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ func (w *watcher) Run(ctx context.Context) error {
|
|||
w.ctx, w.cancelCtx = context.WithCancel(context.Background())
|
||||
w.updater.SubscribeLinksUpdate(func(info spaceindex.LinksUpdateInfo) {
|
||||
if err := w.infoBatch.Add(w.ctx, info); err != nil {
|
||||
log.With("objectId", info.LinksFromId).Errorf("failed to add backlinks update info to message batch: %v", err)
|
||||
log.Error("failed to add backlinks update info to message batch", zap.String("objectId", info.LinksFromId.ObjectID), zap.Error(err))
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -112,32 +112,48 @@ func (w *watcher) FlushUpdates() {
|
|||
w.updateAccumulatedBacklinks()
|
||||
}
|
||||
|
||||
func applyUpdates(m map[string]*backLinksUpdate, update spaceindex.LinksUpdateInfo) {
|
||||
if update.LinksFromId == "" {
|
||||
func applyUpdate(m map[domain.FullID]*backLinksUpdate, update spaceindex.LinksUpdateInfo, parseId func(string) (domain.FullID, error)) {
|
||||
if update.LinksFromId.ObjectID == "" {
|
||||
return
|
||||
}
|
||||
|
||||
for _, removed := range update.Removed {
|
||||
if _, ok := m[removed]; !ok {
|
||||
m[removed] = &backLinksUpdate{}
|
||||
fullId, err := parseId(removed)
|
||||
if err != nil {
|
||||
log.Error("failed to parse id", zap.String("objectId", removed), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if i := lo.IndexOf(m[removed].added, update.LinksFromId); i >= 0 {
|
||||
m[removed].added = append(m[removed].added[:i], m[removed].added[i+1:]...)
|
||||
if fullId.SpaceID == "" {
|
||||
fullId.SpaceID = update.LinksFromId.SpaceID
|
||||
}
|
||||
if !lo.Contains(m[removed].removed, update.LinksFromId) {
|
||||
m[removed].removed = append(m[removed].removed, update.LinksFromId)
|
||||
if _, ok := m[fullId]; !ok {
|
||||
m[fullId] = &backLinksUpdate{}
|
||||
}
|
||||
if i := lo.IndexOf(m[fullId].added, update.LinksFromId.ObjectID); i >= 0 {
|
||||
m[fullId].added = append(m[fullId].added[:i], m[fullId].added[i+1:]...)
|
||||
}
|
||||
if !lo.Contains(m[fullId].removed, update.LinksFromId.ObjectID) {
|
||||
m[fullId].removed = append(m[fullId].removed, update.LinksFromId.ObjectID)
|
||||
}
|
||||
}
|
||||
|
||||
for _, added := range update.Added {
|
||||
if _, ok := m[added]; !ok {
|
||||
m[added] = &backLinksUpdate{}
|
||||
fullId, err := parseId(added)
|
||||
if err != nil {
|
||||
log.Error("failed to parse id", zap.String("objectId", added), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if i := lo.IndexOf(m[added].removed, update.LinksFromId); i >= 0 {
|
||||
m[added].removed = append(m[added].removed[:i], m[added].removed[i+1:]...)
|
||||
if fullId.SpaceID == "" {
|
||||
fullId.SpaceID = update.LinksFromId.SpaceID
|
||||
}
|
||||
if !lo.Contains(m[added].added, update.LinksFromId) {
|
||||
m[added].added = append(m[added].added, update.LinksFromId)
|
||||
if _, ok := m[fullId]; !ok {
|
||||
m[fullId] = &backLinksUpdate{}
|
||||
}
|
||||
if i := lo.IndexOf(m[fullId].removed, update.LinksFromId.ObjectID); i >= 0 {
|
||||
m[fullId].removed = append(m[fullId].removed[:i], m[fullId].removed[i+1:]...)
|
||||
}
|
||||
if !lo.Contains(m[fullId].added, update.LinksFromId.ObjectID) {
|
||||
m[fullId].added = append(m[fullId].added, update.LinksFromId.ObjectID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -182,7 +198,7 @@ func (w *watcher) backlinksUpdateHandler() {
|
|||
w.lock.Lock()
|
||||
for _, info := range msgs {
|
||||
info = cleanSelfLinks(info)
|
||||
applyUpdates(w.accumulatedBacklinks, info)
|
||||
applyUpdate(w.accumulatedBacklinks, info, domain.ParseLongId)
|
||||
}
|
||||
lastReceivedUpdates = time.Now()
|
||||
w.lock.Unlock()
|
||||
|
@ -190,11 +206,13 @@ func (w *watcher) backlinksUpdateHandler() {
|
|||
}
|
||||
|
||||
func (w *watcher) updateAccumulatedBacklinks() {
|
||||
log.Debugf("updating backlinks for %d objects", len(w.accumulatedBacklinks))
|
||||
log.Debug("updating backlinks", zap.Int64("objects number", int64(len(w.accumulatedBacklinks))))
|
||||
for id, updates := range w.accumulatedBacklinks {
|
||||
w.updateBackLinksInObject(id, updates)
|
||||
if err := w.updateBackLinksInObject(id, updates); err != nil {
|
||||
log.Error("failed to update backlinks", zap.String("objectId", id.ObjectID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
w.accumulatedBacklinks = make(map[string]*backLinksUpdate)
|
||||
w.accumulatedBacklinks = make(map[domain.FullID]*backLinksUpdate)
|
||||
}
|
||||
|
||||
func shouldIndexBacklinks(ids threads.DerivedSmartblockIds, id string) bool {
|
||||
|
@ -209,16 +227,10 @@ func shouldIndexBacklinks(ids threads.DerivedSmartblockIds, id string) bool {
|
|||
}
|
||||
}
|
||||
|
||||
func (w *watcher) updateBackLinksInObject(id string, backlinksUpdate *backLinksUpdate) {
|
||||
spaceId, err := w.resolver.ResolveSpaceID(id)
|
||||
func (w *watcher) updateBackLinksInObject(id domain.FullID, backlinksUpdate *backLinksUpdate) (err error) {
|
||||
spc, err := w.spaceService.Get(w.ctx, id.SpaceID)
|
||||
if err != nil {
|
||||
log.With("objectId", id).Errorf("failed to resolve space id for object: %v", err)
|
||||
return
|
||||
}
|
||||
spc, err := w.spaceService.Get(w.ctx, spaceId)
|
||||
if err != nil {
|
||||
log.With("objectId", id, "spaceId", spaceId).Errorf("failed to get space: %v", err)
|
||||
return
|
||||
return fmt.Errorf("failed to get space: %w", err)
|
||||
}
|
||||
spaceDerivedIds := spc.DerivedIDs()
|
||||
|
||||
|
@ -247,10 +259,10 @@ func (w *watcher) updateBackLinksInObject(id string, backlinksUpdate *backLinksU
|
|||
return current, true, nil
|
||||
}
|
||||
|
||||
if shouldIndexBacklinks(spaceDerivedIds, id) {
|
||||
if shouldIndexBacklinks(spaceDerivedIds, id.ObjectID) {
|
||||
// filter-out backlinks in system objects
|
||||
err = spc.DoLockedIfNotExists(id, func() error {
|
||||
return w.store.SpaceIndex(spaceId).ModifyObjectDetails(id, func(details *domain.Details) (*domain.Details, bool, error) {
|
||||
err = spc.DoLockedIfNotExists(id.ObjectID, func() error {
|
||||
return w.store.SpaceIndex(id.SpaceID).ModifyObjectDetails(id.ObjectID, func(details *domain.Details) (*domain.Details, bool, error) {
|
||||
return updateBacklinks(details, backlinksUpdate)
|
||||
})
|
||||
})
|
||||
|
@ -261,9 +273,9 @@ func (w *watcher) updateBackLinksInObject(id string, backlinksUpdate *backLinksU
|
|||
}
|
||||
|
||||
if !errors.Is(err, ocache.ErrExists) {
|
||||
log.With("objectId", id).Errorf("failed to update backlinks for not cached object: %v", err)
|
||||
log.Warn("failed to update backlinks for not cached object", zap.String("objectId", id.ObjectID), zap.Error(err))
|
||||
}
|
||||
if err = spc.Do(id, func(b smartblock.SmartBlock) error {
|
||||
if err = spc.Do(id.ObjectID, func(b smartblock.SmartBlock) error {
|
||||
if cr, ok := b.(source.ChangeReceiver); ok {
|
||||
return cr.StateAppend(func(d state.Doc) (s *state.State, changes []*pb.ChangeContent, err error) {
|
||||
return d.NewState(), nil, nil
|
||||
|
@ -272,19 +284,19 @@ func (w *watcher) updateBackLinksInObject(id string, backlinksUpdate *backLinksU
|
|||
// do no do apply, stateAppend send the event and run the index
|
||||
return nil
|
||||
}); err != nil {
|
||||
log.With("objectId", id).Errorf("failed to update backlinks: %v", err)
|
||||
return fmt.Errorf("failed to update backlinks: %w", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func hasSelfLinks(info spaceindex.LinksUpdateInfo) bool {
|
||||
for _, link := range info.Added {
|
||||
if link == info.LinksFromId {
|
||||
if link == info.LinksFromId.ObjectID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for _, link := range info.Removed {
|
||||
if link == info.LinksFromId {
|
||||
if link == info.LinksFromId.ObjectID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -302,12 +314,12 @@ func cleanSelfLinks(info spaceindex.LinksUpdateInfo) spaceindex.LinksUpdateInfo
|
|||
Removed: make([]string, 0, len(info.Removed)),
|
||||
}
|
||||
for _, link := range info.Added {
|
||||
if link != info.LinksFromId {
|
||||
if link != info.LinksFromId.ObjectID {
|
||||
infoFilter.Added = append(infoFilter.Added, link)
|
||||
}
|
||||
}
|
||||
for _, link := range info.Removed {
|
||||
if link != info.LinksFromId {
|
||||
if link != info.LinksFromId.ObjectID {
|
||||
infoFilter.Removed = append(infoFilter.Removed, link)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package backlinks
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -10,7 +11,6 @@ import (
|
|||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/block/object/idresolver/mock_idresolver"
|
||||
"github.com/anyproto/anytype-heart/core/domain"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
|
||||
|
@ -37,7 +37,6 @@ func (u *testUpdater) start() {
|
|||
|
||||
type fixture struct {
|
||||
store *objectstore.StoreFixture
|
||||
resolver *mock_idresolver.MockResolver
|
||||
spaceService *mock_space.MockService
|
||||
updater *testUpdater
|
||||
*watcher
|
||||
|
@ -46,23 +45,20 @@ type fixture struct {
|
|||
func newFixture(t *testing.T, aggregationInterval time.Duration) *fixture {
|
||||
updater := &testUpdater{}
|
||||
store := objectstore.NewStoreFixture(t)
|
||||
resolver := mock_idresolver.NewMockResolver(t)
|
||||
spaceSvc := mock_space.NewMockService(t)
|
||||
|
||||
w := &watcher{
|
||||
updater: updater,
|
||||
store: store,
|
||||
resolver: resolver,
|
||||
spaceService: spaceSvc,
|
||||
|
||||
aggregationInterval: aggregationInterval,
|
||||
infoBatch: mb.New[spaceindex.LinksUpdateInfo](0),
|
||||
accumulatedBacklinks: make(map[string]*backLinksUpdate),
|
||||
accumulatedBacklinks: make(map[domain.FullID]*backLinksUpdate),
|
||||
}
|
||||
|
||||
return &fixture{
|
||||
store: store,
|
||||
resolver: resolver,
|
||||
spaceService: spaceSvc,
|
||||
updater: updater,
|
||||
watcher: w,
|
||||
|
@ -73,24 +69,24 @@ func TestWatcher_Run(t *testing.T) {
|
|||
t.Run("backlinks update asynchronously", func(t *testing.T) {
|
||||
// given
|
||||
interval := 500 * time.Millisecond
|
||||
fromId := domain.FullID{ObjectID: "obj1", SpaceID: spaceId}
|
||||
f := newFixture(t, interval)
|
||||
|
||||
f.resolver.EXPECT().ResolveSpaceID(mock.Anything).Return(spaceId, nil)
|
||||
f.updater.runFunc = func(callback func(info spaceindex.LinksUpdateInfo)) {
|
||||
callback(spaceindex.LinksUpdateInfo{
|
||||
LinksFromId: "obj1",
|
||||
LinksFromId: fromId,
|
||||
Added: []string{"obj2", "obj3"},
|
||||
Removed: nil,
|
||||
})
|
||||
time.Sleep(interval / 2)
|
||||
callback(spaceindex.LinksUpdateInfo{
|
||||
LinksFromId: "obj1",
|
||||
LinksFromId: fromId,
|
||||
Added: []string{"obj4", "obj5"},
|
||||
Removed: []string{"obj2"},
|
||||
})
|
||||
time.Sleep(interval / 2)
|
||||
callback(spaceindex.LinksUpdateInfo{
|
||||
LinksFromId: "obj1",
|
||||
LinksFromId: fromId,
|
||||
Added: []string{"obj6"},
|
||||
Removed: []string{"obj5"},
|
||||
})
|
||||
|
@ -129,7 +125,6 @@ func TestWatcher_updateAccumulatedBacklinks(t *testing.T) {
|
|||
t.Run("no errors", func(t *testing.T) {
|
||||
// given
|
||||
f := newFixture(t, time.Second)
|
||||
f.resolver.EXPECT().ResolveSpaceID(mock.Anything).Return(spaceId, nil)
|
||||
|
||||
f.store.AddObjects(t, spaceId, []spaceindex.TestObject{{
|
||||
bundle.RelationKeyId: domain.String("obj1"),
|
||||
|
@ -155,15 +150,15 @@ func TestWatcher_updateAccumulatedBacklinks(t *testing.T) {
|
|||
|
||||
spc.EXPECT().Do(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
|
||||
f.watcher.accumulatedBacklinks = map[string]*backLinksUpdate{
|
||||
"obj1": {
|
||||
f.watcher.accumulatedBacklinks = map[domain.FullID]*backLinksUpdate{
|
||||
domain.FullID{ObjectID: "obj1", SpaceID: spaceId}: {
|
||||
added: []string{"obj2", "obj3"},
|
||||
removed: []string{"obj4", "obj5"},
|
||||
},
|
||||
"obj2": {
|
||||
domain.FullID{ObjectID: "obj2", SpaceID: spaceId}: {
|
||||
added: []string{"obj4", "obj5"},
|
||||
},
|
||||
"obj3": {
|
||||
domain.FullID{ObjectID: "obj3", SpaceID: spaceId}: {
|
||||
removed: []string{"obj1", "obj4"},
|
||||
},
|
||||
}
|
||||
|
@ -180,3 +175,98 @@ func TestWatcher_updateAccumulatedBacklinks(t *testing.T) {
|
|||
assert.Equal(t, []string{"obj2"}, details.GetStringList(bundle.RelationKeyBacklinks))
|
||||
})
|
||||
}
|
||||
|
||||
func TestApplyUpdate(t *testing.T) {
|
||||
t.Run("empty map", func(t *testing.T) {
|
||||
// given
|
||||
m := make(map[domain.FullID]*backLinksUpdate)
|
||||
|
||||
// when
|
||||
applyUpdate(m, spaceindex.LinksUpdateInfo{
|
||||
LinksFromId: domain.FullID{ObjectID: "obj1", SpaceID: spaceId},
|
||||
Added: []string{"obj2", "spc2/obj3"},
|
||||
Removed: []string{"spc1/obj4", "spc3/obj5"},
|
||||
}, parseId)
|
||||
|
||||
// then
|
||||
require.Len(t, m, 4)
|
||||
|
||||
update, ok := m[domain.FullID{ObjectID: "obj2", SpaceID: spaceId}]
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, []string{"obj1"}, update.added)
|
||||
assert.Empty(t, update.removed)
|
||||
|
||||
update, ok = m[domain.FullID{ObjectID: "obj3", SpaceID: "spc2"}]
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, []string{"obj1"}, update.added)
|
||||
assert.Empty(t, update.removed)
|
||||
|
||||
update, ok = m[domain.FullID{ObjectID: "obj4", SpaceID: "spc1"}]
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, []string{"obj1"}, update.removed)
|
||||
assert.Empty(t, update.added)
|
||||
|
||||
update, ok = m[domain.FullID{ObjectID: "obj5", SpaceID: "spc3"}]
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, []string{"obj1"}, update.removed)
|
||||
assert.Empty(t, update.added)
|
||||
})
|
||||
|
||||
t.Run("prefilled map", func(t *testing.T) {
|
||||
// given
|
||||
m := map[domain.FullID]*backLinksUpdate{
|
||||
domain.FullID{ObjectID: "obj2", SpaceID: spaceId}: {
|
||||
added: []string{"obj3"},
|
||||
},
|
||||
domain.FullID{ObjectID: "obj3", SpaceID: "spc2"}: {
|
||||
removed: []string{"obj1"},
|
||||
},
|
||||
domain.FullID{ObjectID: "obj5", SpaceID: "spc3"}: {
|
||||
added: []string{"obj1", "obj2"},
|
||||
removed: []string{"obj3"},
|
||||
},
|
||||
}
|
||||
|
||||
// when
|
||||
applyUpdate(m, spaceindex.LinksUpdateInfo{
|
||||
LinksFromId: domain.FullID{ObjectID: "obj1", SpaceID: spaceId},
|
||||
Added: []string{"obj2", "spc2/obj3"},
|
||||
Removed: []string{"spc1/obj4", "spc3/obj5"},
|
||||
}, parseId)
|
||||
|
||||
// then
|
||||
require.Len(t, m, 4)
|
||||
|
||||
update, ok := m[domain.FullID{ObjectID: "obj2", SpaceID: spaceId}]
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, []string{"obj3", "obj1"}, update.added)
|
||||
assert.Empty(t, update.removed)
|
||||
|
||||
update, ok = m[domain.FullID{ObjectID: "obj3", SpaceID: "spc2"}]
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, []string{"obj1"}, update.added)
|
||||
assert.Empty(t, update.removed)
|
||||
|
||||
update, ok = m[domain.FullID{ObjectID: "obj4", SpaceID: "spc1"}]
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, []string{"obj1"}, update.removed)
|
||||
assert.Empty(t, update.added)
|
||||
|
||||
update, ok = m[domain.FullID{ObjectID: "obj5", SpaceID: "spc3"}]
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, []string{"obj3", "obj1"}, update.removed)
|
||||
assert.Equal(t, []string{"obj2"}, update.added)
|
||||
})
|
||||
}
|
||||
|
||||
func parseId(id string) (domain.FullID, error) {
|
||||
parts := strings.Split(id, "/")
|
||||
switch len(parts) {
|
||||
case 0:
|
||||
return domain.FullID{}, domain.ErrParseLongId
|
||||
case 1:
|
||||
return domain.FullID{ObjectID: parts[0]}, nil
|
||||
default:
|
||||
return domain.FullID{ObjectID: parts[1], SpaceID: parts[0]}, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package domain
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
@ -16,6 +17,8 @@ func (i FullID) IsEmpty() bool {
|
|||
|
||||
const ParticipantPrefix = "_participant_"
|
||||
|
||||
var ErrParseLongId = errors.New("failed to parse object id")
|
||||
|
||||
func NewParticipantId(spaceId, identity string) string {
|
||||
// Replace dots with underscores to avoid issues on Desktop client
|
||||
spaceId = strings.Replace(spaceId, ".", "_", 1)
|
||||
|
@ -33,3 +36,11 @@ func ParseParticipantId(participantId string) (spaceId string, identity string,
|
|||
|
||||
return fmt.Sprintf("%s.%s", parts[2], parts[3]), parts[4], nil
|
||||
}
|
||||
|
||||
func ParseLongId(id string) (FullID, error) {
|
||||
if id == "" {
|
||||
return FullID{}, ErrParseLongId
|
||||
}
|
||||
// TODO: support spaceId in long ids
|
||||
return FullID{ObjectID: id}, nil
|
||||
}
|
||||
|
|
|
@ -175,7 +175,7 @@ func (s *dsObjectStore) Init() error {
|
|||
}
|
||||
|
||||
type LinksUpdateInfo struct {
|
||||
LinksFromId string
|
||||
LinksFromId domain.FullID
|
||||
Added, Removed []string
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@ package spaceindex
|
|||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/domain"
|
||||
)
|
||||
|
||||
type SubscriptionManager struct {
|
||||
|
@ -15,7 +17,7 @@ func (s *SubscriptionManager) SubscribeLinksUpdate(callback func(info LinksUpdat
|
|||
s.lock.Unlock()
|
||||
}
|
||||
|
||||
func (s *SubscriptionManager) updateObjectLinks(fromId string, added []string, removed []string) {
|
||||
func (s *SubscriptionManager) updateObjectLinks(fromId domain.FullID, added []string, removed []string) {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
if s.onLinksUpdateCallback != nil && len(added)+len(removed) > 0 {
|
||||
|
|
|
@ -122,7 +122,7 @@ func (s *dsObjectStore) UpdateObjectLinks(ctx context.Context, id string, links
|
|||
return err
|
||||
}
|
||||
|
||||
s.subManager.updateObjectLinks(id, added, removed)
|
||||
s.subManager.updateObjectLinks(domain.FullID{SpaceID: s.SpaceId(), ObjectID: id}, added, removed)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue