1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-09 17:44:59 +09:00

GO-4780 Merge branch 'GO-4146-new-spacestore' into go-4780-change-orderid-to-afterorderid-in-subscriptiongo

This commit is contained in:
Sergey 2025-02-04 16:16:21 +01:00
commit 9011e363b8
No known key found for this signature in database
GPG key ID: 3B6BEF79160221C6
48 changed files with 1735 additions and 2230 deletions

View file

@ -38,9 +38,6 @@ packages:
dir: "{{.InterfaceDir}}"
outpkg: "{{.PackageName}}"
inpackage: true
github.com/anyproto/anytype-heart/core/block/editor/lastused:
interfaces:
ObjectUsageUpdater:
github.com/anyproto/anytype-heart/core/block/import/common:
interfaces:
Converter:

View file

@ -46,7 +46,6 @@ import (
"github.com/anyproto/anytype-heart/core/block/detailservice"
"github.com/anyproto/anytype-heart/core/block/editor"
"github.com/anyproto/anytype-heart/core/block/editor/converter"
"github.com/anyproto/anytype-heart/core/block/editor/lastused"
"github.com/anyproto/anytype-heart/core/block/export"
importer "github.com/anyproto/anytype-heart/core/block/import"
"github.com/anyproto/anytype-heart/core/block/object/idderiver/idderiverimpl"
@ -83,7 +82,6 @@ import (
paymentscache "github.com/anyproto/anytype-heart/core/payments/cache"
"github.com/anyproto/anytype-heart/core/peerstatus"
"github.com/anyproto/anytype-heart/core/publish"
"github.com/anyproto/anytype-heart/core/recordsbatcher"
"github.com/anyproto/anytype-heart/core/session"
"github.com/anyproto/anytype-heart/core/spaceview"
"github.com/anyproto/anytype-heart/core/subscription"
@ -294,7 +292,6 @@ func Bootstrap(a *app.App, components ...app.Component) {
Register(acl.New()).
Register(builtintemplate.New()).
Register(converter.NewLayoutConverter()).
Register(recordsbatcher.New()).
Register(configfetcher.New()).
Register(process.New()).
Register(core.NewTempDirService()).
@ -336,7 +333,6 @@ func Bootstrap(a *app.App, components ...app.Component) {
Register(payments.New()).
Register(paymentscache.New()).
Register(peerstatus.New()).
Register(lastused.New()).
Register(spaceview.New()).
Register(api.New())
}

View file

@ -8,7 +8,7 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/ocache"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"github.com/samber/lo"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
@ -56,10 +56,12 @@ type watcher struct {
resolver idresolver.Resolver
spaceService space.Service
infoBatch *mb.MB
infoBatch *mb.MB[spaceindex.LinksUpdateInfo]
lock sync.Mutex
accumulatedBacklinks map[string]*backLinksUpdate
aggregationInterval time.Duration
cancelCtx context.CancelFunc
ctx context.Context
}
func New() UpdateWatcher {
@ -75,22 +77,26 @@ func (w *watcher) Init(a *app.App) error {
w.store = app.MustComponent[objectstore.ObjectStore](a)
w.resolver = app.MustComponent[idresolver.Resolver](a)
w.spaceService = app.MustComponent[space.Service](a)
w.infoBatch = mb.New(0)
w.infoBatch = mb.New[spaceindex.LinksUpdateInfo](0)
w.accumulatedBacklinks = make(map[string]*backLinksUpdate)
w.aggregationInterval = defaultAggregationInterval
return nil
}
func (w *watcher) Close(context.Context) error {
if w.cancelCtx != nil {
w.cancelCtx()
}
if err := w.infoBatch.Close(); err != nil {
log.Errorf("failed to close message batch: %v", err)
}
return nil
}
func (w *watcher) Run(context.Context) error {
func (w *watcher) Run(ctx context.Context) error {
w.ctx, w.cancelCtx = context.WithCancel(context.Background())
w.updater.SubscribeLinksUpdate(func(info spaceindex.LinksUpdateInfo) {
if err := w.infoBatch.Add(info); err != nil {
if err := w.infoBatch.Add(w.ctx, info); err != nil {
log.With("objectId", info.LinksFromId).Errorf("failed to add backlinks update info to message batch: %v", err)
}
})
@ -165,17 +171,16 @@ func (w *watcher) backlinksUpdateHandler() {
}()
for {
msgs := w.infoBatch.Wait()
msgs, err := w.infoBatch.Wait(w.ctx)
if err != nil {
return
}
if len(msgs) == 0 {
return
}
w.lock.Lock()
for _, msg := range msgs {
info, ok := msg.(spaceindex.LinksUpdateInfo)
if !ok {
continue
}
for _, info := range msgs {
info = cleanSelfLinks(info)
applyUpdates(w.accumulatedBacklinks, info)
}
@ -210,7 +215,7 @@ func (w *watcher) updateBackLinksInObject(id string, backlinksUpdate *backLinksU
log.With("objectId", id).Errorf("failed to resolve space id for object: %v", err)
return
}
spc, err := w.spaceService.Get(context.Background(), spaceId)
spc, err := w.spaceService.Get(w.ctx, spaceId)
if err != nil {
log.With("objectId", id, "spaceId", spaceId).Errorf("failed to get space: %v", err)
return

View file

@ -5,7 +5,7 @@ import (
"time"
"github.com/anyproto/any-sync/app/ocache"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -56,7 +56,7 @@ func newFixture(t *testing.T, aggregationInterval time.Duration) *fixture {
spaceService: spaceSvc,
aggregationInterval: aggregationInterval,
infoBatch: mb.New(0),
infoBatch: mb.New[spaceindex.LinksUpdateInfo](0),
accumulatedBacklinks: make(map[string]*backLinksUpdate),
}

View file

@ -417,54 +417,6 @@ func (_c *MockService_SetDetails_Call) RunAndReturn(run func(session.Context, st
return _c
}
// SetDetailsAndUpdateLastUsed provides a mock function with given fields: ctx, objectId, details
func (_m *MockService) SetDetailsAndUpdateLastUsed(ctx session.Context, objectId string, details []domain.Detail) error {
ret := _m.Called(ctx, objectId, details)
if len(ret) == 0 {
panic("no return value specified for SetDetailsAndUpdateLastUsed")
}
var r0 error
if rf, ok := ret.Get(0).(func(session.Context, string, []domain.Detail) error); ok {
r0 = rf(ctx, objectId, details)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockService_SetDetailsAndUpdateLastUsed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetDetailsAndUpdateLastUsed'
type MockService_SetDetailsAndUpdateLastUsed_Call struct {
*mock.Call
}
// SetDetailsAndUpdateLastUsed is a helper method to define mock.On call
// - ctx session.Context
// - objectId string
// - details []domain.Detail
func (_e *MockService_Expecter) SetDetailsAndUpdateLastUsed(ctx interface{}, objectId interface{}, details interface{}) *MockService_SetDetailsAndUpdateLastUsed_Call {
return &MockService_SetDetailsAndUpdateLastUsed_Call{Call: _e.mock.On("SetDetailsAndUpdateLastUsed", ctx, objectId, details)}
}
func (_c *MockService_SetDetailsAndUpdateLastUsed_Call) Run(run func(ctx session.Context, objectId string, details []domain.Detail)) *MockService_SetDetailsAndUpdateLastUsed_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(session.Context), args[1].(string), args[2].([]domain.Detail))
})
return _c
}
func (_c *MockService_SetDetailsAndUpdateLastUsed_Call) Return(_a0 error) *MockService_SetDetailsAndUpdateLastUsed_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockService_SetDetailsAndUpdateLastUsed_Call) RunAndReturn(run func(session.Context, string, []domain.Detail) error) *MockService_SetDetailsAndUpdateLastUsed_Call {
_c.Call.Return(run)
return _c
}
// SetDetailsList provides a mock function with given fields: ctx, objectIds, details
func (_m *MockService) SetDetailsList(ctx session.Context, objectIds []string, details []domain.Detail) error {
ret := _m.Called(ctx, objectIds, details)

View file

@ -31,7 +31,6 @@ type Service interface {
app.Component
SetDetails(ctx session.Context, objectId string, details []domain.Detail) error
SetDetailsAndUpdateLastUsed(ctx session.Context, objectId string, details []domain.Detail) error
SetDetailsList(ctx session.Context, objectIds []string, details []domain.Detail) error
ModifyDetails(ctx session.Context, objectId string, modifier func(current *domain.Details) (*domain.Details, error)) error
ModifyDetailsList(req *pb.RpcObjectListModifyDetailValuesRequest) error
@ -81,23 +80,10 @@ func (s *service) SetDetails(ctx session.Context, objectId string, details []dom
})
}
func (s *service) SetDetailsAndUpdateLastUsed(ctx session.Context, objectId string, details []domain.Detail) (err error) {
return cache.Do(s.objectGetter, objectId, func(b basic.DetailsSettable) error {
return b.SetDetailsAndUpdateLastUsed(ctx, details, true)
})
}
func (s *service) SetDetailsList(ctx session.Context, objectIds []string, details []domain.Detail) (err error) {
var (
resultError error
anySucceed bool
)
for i, objectId := range objectIds {
setDetailsFunc := s.SetDetails
if i == 0 {
setDetailsFunc = s.SetDetailsAndUpdateLastUsed
}
err := setDetailsFunc(ctx, objectId, details)
func (s *service) SetDetailsList(ctx session.Context, objectIds []string, details []domain.Detail) (resultError error) {
var anySucceed bool
for _, objectId := range objectIds {
err := s.SetDetails(ctx, objectId, details)
if err != nil {
resultError = errors.Join(resultError, err)
} else {
@ -120,20 +106,10 @@ func (s *service) ModifyDetails(ctx session.Context, objectId string, modifier f
})
}
func (s *service) ModifyDetailsAndUpdateLastUsed(ctx session.Context, objectId string, modifier func(current *domain.Details) (*domain.Details, error)) (err error) {
return cache.Do(s.objectGetter, objectId, func(du basic.DetailsUpdatable) error {
return du.UpdateDetailsAndLastUsed(ctx, modifier)
})
}
func (s *service) ModifyDetailsList(req *pb.RpcObjectListModifyDetailValuesRequest) (resultError error) {
var anySucceed bool
for i, objectId := range req.ObjectIds {
modifyDetailsFunc := s.ModifyDetails
if i == 0 {
modifyDetailsFunc = s.ModifyDetailsAndUpdateLastUsed
}
err := modifyDetailsFunc(nil, objectId, func(current *domain.Details) (*domain.Details, error) {
for _, objectId := range req.ObjectIds {
err := s.ModifyDetails(nil, objectId, func(current *domain.Details) (*domain.Details, error) {
for _, op := range req.Operations {
if !pbtypes.IsNullValue(op.Set) {
// Set operation has higher priority than Add and Remove, because it modifies full value

View file

@ -79,7 +79,7 @@ func TestService_SetDetailsList(t *testing.T) {
{Key: bundle.RelationKeyLinkedProjects, Value: domain.StringList([]string{"important", "urgent"})},
}
t.Run("lastUsed is updated once", func(t *testing.T) {
t.Run("no error", func(t *testing.T) {
// given
fx := newFixture(t)
objects := map[string]*smarttest.SmartTest{
@ -98,16 +98,6 @@ func TestService_SetDetailsList(t *testing.T) {
// then
assert.NoError(t, err)
require.Len(t, objects["obj1"].Results.LastUsedUpdates, 3)
assert.Equal(t, []string{
bundle.RelationKeyAssignee.String(),
bundle.RelationKeyDone.String(),
bundle.RelationKeyLinkedProjects.String(),
}, objects["obj1"].Results.LastUsedUpdates)
// lastUsed should be updated only during the work under 1st object
assert.Len(t, objects["obj2"].Results.LastUsedUpdates, 0)
assert.Len(t, objects["obj3"].Results.LastUsedUpdates, 0)
assert.Equal(t, "Mark Twain", objects["obj1"].NewState().Details().GetString(bundle.RelationKeyAssignee))
assert.True(t, objects["obj2"].NewState().Details().GetBool(bundle.RelationKeyDone))
@ -153,7 +143,7 @@ func TestService_ModifyDetailsList(t *testing.T) {
{RelationKey: bundle.RelationKeyDone.String(), Set: domain.Bool(true).ToProto()},
}
t.Run("lastUsed is updated once", func(t *testing.T) {
t.Run("no error", func(t *testing.T) {
fx := newFixture(t)
objects := map[string]*smarttest.SmartTest{
"obj1": smarttest.New("obj1"),
@ -174,11 +164,6 @@ func TestService_ModifyDetailsList(t *testing.T) {
// then
assert.NoError(t, err)
require.Len(t, objects["obj1"].Results.LastUsedUpdates, 3)
// lastUsed should be updated only during the work under 1st object
assert.Len(t, objects["obj2"].Results.LastUsedUpdates, 0)
assert.Len(t, objects["obj3"].Results.LastUsedUpdates, 0)
})
t.Run("some updates failed", func(t *testing.T) {
@ -306,7 +291,7 @@ func TestService_SetWorkspaceDashboardId(t *testing.T) {
assert.Equal(t, wsObjectId, objectId)
ws := &editor.Workspaces{
SmartBlock: sb,
AllOperations: basic.NewBasic(sb, fx.store.SpaceIndex(spaceId), nil, nil, nil),
AllOperations: basic.NewBasic(sb, fx.store.SpaceIndex(spaceId), nil, nil),
}
return ws, nil
})
@ -329,7 +314,7 @@ func TestService_SetWorkspaceDashboardId(t *testing.T) {
assert.Equal(t, wsObjectId, objectId)
ws := &editor.Workspaces{
SmartBlock: sb,
AllOperations: basic.NewBasic(sb, fx.store.SpaceIndex(spaceId), nil, nil, nil),
AllOperations: basic.NewBasic(sb, fx.store.SpaceIndex(spaceId), nil, nil),
}
return ws, nil
})

View file

@ -18,7 +18,6 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/anystoredebug"
"github.com/anyproto/anytype-heart/core/block/editor/basic"
"github.com/anyproto/anytype-heart/core/block/editor/converter"
"github.com/anyproto/anytype-heart/core/block/editor/lastused"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/editor/storestate"
@ -88,23 +87,18 @@ func (a *accountObject) SetDetails(ctx session.Context, details []domain.Detail,
return a.bs.SetDetails(ctx, details, showEvent)
}
func (a *accountObject) SetDetailsAndUpdateLastUsed(ctx session.Context, details []domain.Detail, showEvent bool) (err error) {
return a.bs.SetDetailsAndUpdateLastUsed(ctx, details, showEvent)
}
func New(
sb smartblock.SmartBlock,
keys *accountdata.AccountKeys,
spaceObjects spaceindex.Store,
layoutConverter converter.LayoutConverter,
fileObjectService fileobject.Service,
lastUsedUpdater lastused.ObjectUsageUpdater,
crdtDb anystore.DB,
cfg *config.Config) AccountObject {
return &accountObject{
crdtDb: crdtDb,
keys: keys,
bs: basic.NewBasic(sb, spaceObjects, layoutConverter, fileObjectService, lastUsedUpdater),
bs: basic.NewBasic(sb, spaceObjects, layoutConverter, fileObjectService),
SmartBlock: sb,
cfg: cfg,
relMapper: newRelationsMapper(map[string]KeyType{

View file

@ -55,7 +55,7 @@ func newFixture(t *testing.T, isNewAccount bool, prepareDb func(db anystore.DB))
indexStore := objectstore.NewStoreFixture(t).SpaceIndex("spaceId")
keys, err := accountdata.NewRandom()
require.NoError(t, err)
object := New(sb, keys, indexStore, nil, nil, nil, db, cfg)
object := New(sb, keys, indexStore, nil, nil, db, cfg)
fx := &fixture{
storeFx: objectstore.NewStoreFixture(t),
db: db,

View file

@ -6,7 +6,6 @@ import (
"github.com/samber/lo"
"github.com/anyproto/anytype-heart/core/block/editor/converter"
"github.com/anyproto/anytype-heart/core/block/editor/lastused"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/editor/table"
@ -64,12 +63,10 @@ type CommonOperations interface {
type DetailsSettable interface {
SetDetails(ctx session.Context, details []domain.Detail, showEvent bool) (err error)
SetDetailsAndUpdateLastUsed(ctx session.Context, details []domain.Detail, showEvent bool) (err error)
}
type DetailsUpdatable interface {
UpdateDetails(ctx session.Context, update func(current *domain.Details) (*domain.Details, error)) (err error)
UpdateDetailsAndLastUsed(ctx session.Context, update func(current *domain.Details) (*domain.Details, error)) (err error)
}
type Restrictionable interface {
@ -105,14 +102,12 @@ func NewBasic(
objectStore spaceindex.Store,
layoutConverter converter.LayoutConverter,
fileObjectService fileobject.Service,
lastUsedUpdater lastused.ObjectUsageUpdater,
) AllOperations {
return &basic{
SmartBlock: sb,
objectStore: objectStore,
layoutConverter: layoutConverter,
fileObjectService: fileObjectService,
lastUsedUpdater: lastUsedUpdater,
}
}
@ -122,7 +117,6 @@ type basic struct {
objectStore spaceindex.Store
layoutConverter converter.LayoutConverter
fileObjectService fileobject.Service
lastUsedUpdater lastused.ObjectUsageUpdater
}
func (bs *basic) CreateBlock(s *state.State, req pb.RpcBlockCreateRequest) (id string, err error) {

View file

@ -45,7 +45,7 @@ func TestBasic_Create(t *testing.T) {
t.Run("generic", func(t *testing.T) {
sb := smarttest.New("test")
sb.AddBlock(simple.New(&model.Block{Id: "test"}))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
id, err := b.CreateBlock(st, pb.RpcBlockCreateRequest{
Block: &model.Block{Content: &model.BlockContentOfText{Text: &model.BlockContentText{Text: "ll"}}},
@ -59,7 +59,7 @@ func TestBasic_Create(t *testing.T) {
sb := smarttest.New("test")
sb.AddBlock(simple.New(&model.Block{Id: "test"}))
require.NoError(t, smartblock.ObjectApplyTemplate(sb, sb.NewState(), template.WithTitle))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
s := sb.NewState()
id, err := b.CreateBlock(s, pb.RpcBlockCreateRequest{
TargetId: template.TitleBlockId,
@ -80,7 +80,7 @@ func TestBasic_Create(t *testing.T) {
}
sb.AddBlock(simple.New(&model.Block{Id: "test"}))
require.NoError(t, smartblock.ObjectApplyTemplate(sb, sb.NewState(), template.WithTitle))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
_, err := b.CreateBlock(sb.NewState(), pb.RpcBlockCreateRequest{})
assert.ErrorIs(t, err, restriction.ErrRestricted)
})
@ -94,7 +94,7 @@ func TestBasic_Duplicate(t *testing.T) {
AddBlock(simple.New(&model.Block{Id: "3"}))
st := sb.NewState()
newIds, err := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil).Duplicate(st, st, "", 0, []string{"2"})
newIds, err := NewBasic(sb, nil, converter.NewLayoutConverter(), nil).Duplicate(st, st, "", 0, []string{"2"})
require.NoError(t, err)
err = sb.Apply(st)
@ -172,7 +172,7 @@ func TestBasic_Duplicate(t *testing.T) {
ts.SetDetail(bundle.RelationKeySpaceId, domain.String(tc.spaceIds[1]))
// when
newIds, err := NewBasic(source, nil, nil, tc.fos(), nil).Duplicate(ss, ts, "target", model.Block_Inner, []string{"1", "f1"})
newIds, err := NewBasic(source, nil, nil, tc.fos()).Duplicate(ss, ts, "target", model.Block_Inner, []string{"1", "f1"})
require.NoError(t, err)
require.NoError(t, target.Apply(ts))
@ -206,7 +206,7 @@ func TestBasic_Unlink(t *testing.T) {
AddBlock(simple.New(&model.Block{Id: "2", ChildrenIds: []string{"3"}})).
AddBlock(simple.New(&model.Block{Id: "3"}))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
err := b.Unlink(nil, "2")
require.NoError(t, err)
@ -220,7 +220,7 @@ func TestBasic_Unlink(t *testing.T) {
AddBlock(simple.New(&model.Block{Id: "2", ChildrenIds: []string{"3"}})).
AddBlock(simple.New(&model.Block{Id: "3"}))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
err := b.Unlink(nil, "2", "3")
require.NoError(t, err)
@ -237,7 +237,7 @@ func TestBasic_Move(t *testing.T) {
AddBlock(simple.New(&model.Block{Id: "3"})).
AddBlock(simple.New(&model.Block{Id: "4"}))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
err := b.Move(st, st, "4", model.Block_Inner, []string{"3"})
@ -251,7 +251,7 @@ func TestBasic_Move(t *testing.T) {
sb := smarttest.New("test")
sb.AddBlock(simple.New(&model.Block{Id: "test"}))
require.NoError(t, smartblock.ObjectApplyTemplate(sb, sb.NewState(), template.WithTitle))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
s := sb.NewState()
id1, err := b.CreateBlock(s, pb.RpcBlockCreateRequest{
TargetId: template.HeaderLayoutId,
@ -300,7 +300,7 @@ func TestBasic_Move(t *testing.T) {
},
),
)
basic := NewBasic(testDoc, nil, converter.NewLayoutConverter(), nil, nil)
basic := NewBasic(testDoc, nil, converter.NewLayoutConverter(), nil)
state := testDoc.NewState()
// when
@ -316,7 +316,7 @@ func TestBasic_Move(t *testing.T) {
AddBlock(newTextBlock("1", "", nil)).
AddBlock(newTextBlock("2", "one", nil))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
err := b.Move(st, st, "1", model.Block_InnerFirst, []string{"2"})
require.NoError(t, err)
@ -336,7 +336,7 @@ func TestBasic_Move(t *testing.T) {
AddBlock(firstBlock).
AddBlock(secondBlock)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
err := b.Move(st, st, "1", model.Block_InnerFirst, []string{"2"})
require.NoError(t, err)
@ -350,7 +350,7 @@ func TestBasic_Move(t *testing.T) {
AddBlock(newTextBlock("1", "", nil)).
AddBlock(newTextBlock("2", "one", nil))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
err := b.Move(st, nil, "1", model.Block_Top, []string{"2"})
require.NoError(t, err)
@ -379,7 +379,7 @@ func TestBasic_MoveTableBlocks(t *testing.T) {
t.Run("moving non-root table block '"+block+"' leads to error", func(t *testing.T) {
// given
sb := getSB()
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
// when
@ -394,7 +394,7 @@ func TestBasic_MoveTableBlocks(t *testing.T) {
t.Run("no error on moving root table block", func(t *testing.T) {
// given
sb := getSB()
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
// when
@ -408,7 +408,7 @@ func TestBasic_MoveTableBlocks(t *testing.T) {
t.Run("no error on moving one row between another", func(t *testing.T) {
// given
sb := getSB()
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
// when
@ -422,7 +422,7 @@ func TestBasic_MoveTableBlocks(t *testing.T) {
t.Run("moving rows with incorrect position leads to error", func(t *testing.T) {
// given
sb := getSB()
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
// when
@ -435,7 +435,7 @@ func TestBasic_MoveTableBlocks(t *testing.T) {
t.Run("moving rows and some other blocks between another leads to error", func(t *testing.T) {
// given
sb := getSB()
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
// when
@ -448,7 +448,7 @@ func TestBasic_MoveTableBlocks(t *testing.T) {
t.Run("moving the row between itself leads to error", func(t *testing.T) {
// given
sb := getSB()
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
// when
@ -461,7 +461,7 @@ func TestBasic_MoveTableBlocks(t *testing.T) {
t.Run("moving table block from invalid table leads to error", func(t *testing.T) {
// given
sb := getSB()
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
st.Unlink("columns")
@ -477,7 +477,7 @@ func TestBasic_MoveTableBlocks(t *testing.T) {
t.Run("moving a block to '"+block+"' block leads to moving it under the table", func(t *testing.T) {
// given
sb := getSB()
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
// when
@ -492,7 +492,7 @@ func TestBasic_MoveTableBlocks(t *testing.T) {
t.Run("moving a block to the invalid table leads to moving it under the table", func(t *testing.T) {
// given
sb := getSB()
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
st := sb.NewState()
st.Unlink("columns")
@ -516,7 +516,7 @@ func TestBasic_MoveToAnotherObject(t *testing.T) {
sb2 := smarttest.New("test2")
sb2.AddBlock(simple.New(&model.Block{Id: "test2", ChildrenIds: []string{}}))
b := NewBasic(sb1, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb1, nil, converter.NewLayoutConverter(), nil)
srcState := sb1.NewState()
destState := sb2.NewState()
@ -551,7 +551,7 @@ func TestBasic_Replace(t *testing.T) {
sb := smarttest.New("test")
sb.AddBlock(simple.New(&model.Block{Id: "test", ChildrenIds: []string{"2"}})).
AddBlock(simple.New(&model.Block{Id: "2"}))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
newId, err := b.Replace(nil, "2", &model.Block{Content: &model.BlockContentOfText{Text: &model.BlockContentText{Text: "l"}}})
require.NoError(t, err)
require.NotEmpty(t, newId)
@ -561,7 +561,7 @@ func TestBasic_SetFields(t *testing.T) {
sb := smarttest.New("test")
sb.AddBlock(simple.New(&model.Block{Id: "test", ChildrenIds: []string{"2"}})).
AddBlock(simple.New(&model.Block{Id: "2"}))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
fields := &types.Struct{
Fields: map[string]*types.Value{
@ -580,7 +580,7 @@ func TestBasic_Update(t *testing.T) {
sb := smarttest.New("test")
sb.AddBlock(simple.New(&model.Block{Id: "test", ChildrenIds: []string{"2"}})).
AddBlock(simple.New(&model.Block{Id: "2"}))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
err := b.Update(nil, func(b simple.Block) error {
b.Model().BackgroundColor = "test"
@ -594,7 +594,7 @@ func TestBasic_SetDivStyle(t *testing.T) {
sb := smarttest.New("test")
sb.AddBlock(simple.New(&model.Block{Id: "test", ChildrenIds: []string{"2"}})).
AddBlock(simple.New(&model.Block{Id: "2", Content: &model.BlockContentOfDiv{Div: &model.BlockContentDiv{}}}))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
err := b.SetDivStyle(nil, model.BlockContentDiv_Dots, "2")
require.NoError(t, err)
@ -614,7 +614,7 @@ func TestBasic_SetRelationKey(t *testing.T) {
t.Run("correct", func(t *testing.T) {
sb := smarttest.New("test")
fillSb(sb)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
err := b.SetRelationKey(nil, pb.RpcBlockRelationSetKeyRequest{
BlockId: "2",
Key: "testRelKey",
@ -636,7 +636,7 @@ func TestBasic_SetRelationKey(t *testing.T) {
t.Run("not relation block", func(t *testing.T) {
sb := smarttest.New("test")
fillSb(sb)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
require.Error(t, b.SetRelationKey(nil, pb.RpcBlockRelationSetKeyRequest{
BlockId: "1",
Key: "key",
@ -645,7 +645,7 @@ func TestBasic_SetRelationKey(t *testing.T) {
t.Run("relation not found", func(t *testing.T) {
sb := smarttest.New("test")
fillSb(sb)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
require.Error(t, b.SetRelationKey(nil, pb.RpcBlockRelationSetKeyRequest{
BlockId: "2",
Key: "not exists",
@ -661,7 +661,7 @@ func TestBasic_FeaturedRelationAdd(t *testing.T) {
s.AddBundledRelationLinks(bundle.RelationKeyDescription)
require.NoError(t, sb.Apply(s))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
newRel := []string{bundle.RelationKeyDescription.String(), bundle.RelationKeyName.String()}
require.NoError(t, b.FeaturedRelationAdd(nil, newRel...))
@ -677,7 +677,7 @@ func TestBasic_FeaturedRelationRemove(t *testing.T) {
template.WithDescription(s)
require.NoError(t, sb.Apply(s))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
require.NoError(t, b.FeaturedRelationRemove(nil, bundle.RelationKeyDescription.String()))
res := sb.NewState()
@ -714,7 +714,7 @@ func TestBasic_ReplaceLink(t *testing.T) {
}
require.NoError(t, sb.Apply(s))
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil, nil)
b := NewBasic(sb, nil, converter.NewLayoutConverter(), nil)
require.NoError(t, b.ReplaceLink(oldId, newId))
res := sb.NewState()

View file

@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"strings"
"time"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/editor/state"
@ -28,19 +27,6 @@ func (bs *basic) SetDetails(ctx session.Context, details []domain.Detail, showEv
return err
}
func (bs *basic) SetDetailsAndUpdateLastUsed(ctx session.Context, details []domain.Detail, showEvent bool) (err error) {
var keys []domain.RelationKey
keys, err = bs.setDetails(ctx, details, showEvent)
if err != nil {
return err
}
ts := time.Now().Unix()
for _, key := range keys {
bs.lastUsedUpdater.UpdateLastUsedDate(bs.SpaceID(), key, ts)
}
return nil
}
func (bs *basic) setDetails(ctx session.Context, details []domain.Detail, showEvent bool) (updatedKeys []domain.RelationKey, err error) {
s := bs.NewStateCtx(ctx)
@ -67,23 +53,6 @@ func (bs *basic) UpdateDetails(ctx session.Context, update func(current *domain.
return err
}
func (bs *basic) UpdateDetailsAndLastUsed(ctx session.Context, update func(current *domain.Details) (*domain.Details, error)) error {
oldDetails, newDetails, err := bs.updateDetails(ctx, update)
if err != nil {
return err
}
diff, _ := domain.StructDiff(oldDetails, newDetails)
if diff.Len() == 0 {
return nil
}
ts := time.Now().Unix()
for _, key := range diff.Keys() {
bs.lastUsedUpdater.UpdateLastUsedDate(bs.SpaceID(), key, ts)
}
return nil
}
func (bs *basic) updateDetails(ctx session.Context, update func(current *domain.Details) (*domain.Details, error)) (oldDetails *domain.Details, newDetails *domain.Details, err error) {
if update == nil {
return nil, nil, fmt.Errorf("update function is nil")
@ -180,7 +149,7 @@ func (bs *basic) validateDetailFormat(spaceID string, key domain.RelationKey, v
}
return nil
case model.RelationFormat_status:
vals, ok := v.TryStringList()
vals, ok := v.TryWrapToStringList()
if !ok {
return fmt.Errorf("incorrect type: %v instead of string list", v)
}
@ -190,7 +159,7 @@ func (bs *basic) validateDetailFormat(spaceID string, key domain.RelationKey, v
return bs.validateOptions(r, vals)
case model.RelationFormat_tag:
vals, ok := v.TryStringList()
vals, ok := v.TryWrapToStringList()
if !ok {
return fmt.Errorf("incorrect type: %v instead of string list", v)
}
@ -378,10 +347,6 @@ func (bs *basic) SetObjectTypesInState(s *state.State, objectTypeKeys []domain.T
s.SetObjectTypeKeys(objectTypeKeys)
removeInternalFlags(s)
if bs.CombinedDetails().GetInt64(bundle.RelationKeyOrigin) == int64(model.ObjectOrigin_none) {
bs.lastUsedUpdater.UpdateLastUsedDate(bs.SpaceID(), objectTypeKeys[0], time.Now().Unix())
}
toLayout, err := bs.getLayoutForType(objectTypeKeys[0])
if err != nil {
return fmt.Errorf("get layout for type %s: %w", objectTypeKeys[0], err)

View file

@ -4,10 +4,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/anyproto/anytype-heart/core/block/editor/converter"
"github.com/anyproto/anytype-heart/core/block/editor/lastused/mock_lastused"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock/smarttest"
"github.com/anyproto/anytype-heart/core/block/restriction"
"github.com/anyproto/anytype-heart/core/domain"
@ -18,10 +16,9 @@ import (
)
type basicFixture struct {
sb *smarttest.SmartTest
store *spaceindex.StoreFixture
lastUsed *mock_lastused.MockObjectUsageUpdater
basic CommonOperations
sb *smarttest.SmartTest
store *spaceindex.StoreFixture
basic CommonOperations
}
var (
@ -35,15 +32,13 @@ func newBasicFixture(t *testing.T) *basicFixture {
sb.SetSpaceId(spaceId)
store := spaceindex.NewStoreFixture(t)
lastUsed := mock_lastused.NewMockObjectUsageUpdater(t)
b := NewBasic(sb, store, converter.NewLayoutConverter(), nil, lastUsed)
b := NewBasic(sb, store, converter.NewLayoutConverter(), nil)
return &basicFixture{
sb: sb,
store: store,
lastUsed: lastUsed,
basic: b,
sb: sb,
store: store,
basic: b,
}
}
@ -149,7 +144,6 @@ func TestBasic_SetObjectTypesInState(t *testing.T) {
// given
f := newBasicFixture(t)
f.lastUsed.EXPECT().UpdateLastUsedDate(mock.Anything, bundle.TypeKeyTask, mock.Anything).Return().Once()
f.store.AddObjects(t, []objectstore.TestObject{{
bundle.RelationKeySpaceId: domain.String(spaceId),
bundle.RelationKeyId: domain.String("ot-task"),

View file

@ -319,7 +319,7 @@ func TestExtractObjects(t *testing.T) {
ObjectTypeUniqueKey: domain.MustUniqueKey(coresb.SmartBlockTypeObjectType, tc.typeKey).Marshal(),
}
ctx := session.NewContext()
linkIds, err := NewBasic(sb, fixture.store, converter.NewLayoutConverter(), nil, nil).ExtractBlocksToObjects(ctx, creator, ts, req)
linkIds, err := NewBasic(sb, fixture.store, converter.NewLayoutConverter(), nil).ExtractBlocksToObjects(ctx, creator, ts, req)
assert.NoError(t, err)
gotBlockIds := []string{}
@ -374,7 +374,7 @@ func TestExtractObjects(t *testing.T) {
}},
}
ctx := session.NewContext()
_, err := NewBasic(sb, fixture.store, converter.NewLayoutConverter(), nil, nil).ExtractBlocksToObjects(ctx, creator, ts, req)
_, err := NewBasic(sb, fixture.store, converter.NewLayoutConverter(), nil).ExtractBlocksToObjects(ctx, creator, ts, req)
assert.NoError(t, err)
var block *model.Block
for _, block = range sb.Blocks() {
@ -407,7 +407,7 @@ func TestExtractObjects(t *testing.T) {
}},
}
ctx := session.NewContext()
_, err := NewBasic(sb, fixture.store, converter.NewLayoutConverter(), nil, nil).ExtractBlocksToObjects(ctx, creator, ts, req)
_, err := NewBasic(sb, fixture.store, converter.NewLayoutConverter(), nil).ExtractBlocksToObjects(ctx, creator, ts, req)
assert.NoError(t, err)
var addedBlocks []*model.Block
for _, message := range sb.Results.Events {

View file

@ -35,7 +35,7 @@ type Dashboard struct {
func NewDashboard(sb smartblock.SmartBlock, objectStore spaceindex.Store, layoutConverter converter.LayoutConverter) *Dashboard {
return &Dashboard{
SmartBlock: sb,
AllOperations: basic.NewBasic(sb, objectStore, layoutConverter, nil, nil),
AllOperations: basic.NewBasic(sb, objectStore, layoutConverter, nil),
Collection: collection.NewCollection(sb, objectStore),
objectStore: objectStore,
}

View file

@ -14,7 +14,6 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/chatobject"
"github.com/anyproto/anytype-heart/core/block/editor/converter"
"github.com/anyproto/anytype-heart/core/block/editor/file"
"github.com/anyproto/anytype-heart/core/block/editor/lastused"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/migration"
"github.com/anyproto/anytype-heart/core/block/object/idresolver"
@ -74,7 +73,6 @@ type ObjectFactory struct {
fileReconciler reconciler.Reconciler
objectDeleter ObjectDeleter
deviceService deviceService
lastUsedUpdater lastused.ObjectUsageUpdater
spaceIdResolver idresolver.Resolver
}
@ -107,7 +105,6 @@ func (f *ObjectFactory) Init(a *app.App) (err error) {
f.objectDeleter = app.MustComponent[ObjectDeleter](a)
f.fileReconciler = app.MustComponent[reconciler.Reconciler](a)
f.deviceService = app.MustComponent[deviceService](a)
f.lastUsedUpdater = app.MustComponent[lastused.ObjectUsageUpdater](a)
f.spaceIdResolver = app.MustComponent[idresolver.Resolver](a)
return nil
}
@ -215,7 +212,7 @@ func (f *ObjectFactory) New(space smartblock.Space, sbType coresb.SmartBlockType
case coresb.SmartBlockTypeChatDerivedObject:
return chatobject.New(sb, f.accountService, f.eventSender, f.objectStore.GetCrdtDb(space.Id()), spaceIndex), nil
case coresb.SmartBlockTypeAccountObject:
return accountobject.New(sb, f.accountService.Keys(), spaceIndex, f.layoutConverter, f.fileObjectService, f.lastUsedUpdater, f.objectStore.GetCrdtDb(space.Id()), f.config), nil
return accountobject.New(sb, f.accountService.Keys(), spaceIndex, f.layoutConverter, f.fileObjectService, f.objectStore.GetCrdtDb(space.Id()), f.config), nil
default:
return nil, fmt.Errorf("unexpected smartblock type: %v", sbType)
}

View file

@ -28,7 +28,7 @@ var fileRequiredRelations = append(pageRequiredRelations, []domain.RelationKey{
func (f *ObjectFactory) newFile(spaceId string, sb smartblock.SmartBlock) *File {
store := f.objectStore.SpaceIndex(spaceId)
basicComponent := basic.NewBasic(sb, store, f.layoutConverter, f.fileObjectService, f.lastUsedUpdater)
basicComponent := basic.NewBasic(sb, store, f.layoutConverter, f.fileObjectService)
return &File{
SmartBlock: sb,
ChangeReceiver: sb.(source.ChangeReceiver),

View file

@ -1,191 +1,15 @@
package lastused
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
coresb "github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/addr"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/space"
"github.com/anyproto/anytype-heart/space/clientspace"
)
const (
CName = "object-usage-updater"
maxInstallationTime = 5 * time.Minute
updateInterval = 5 * time.Second
)
type Key interface {
URL() string
String() string
}
type message struct {
spaceId string
key Key
time int64
}
var log = logger.NewNamed("update-last-used-date")
type ObjectUsageUpdater interface {
app.ComponentRunnable
UpdateLastUsedDate(spaceId string, key Key, timeStamp int64)
}
func New() ObjectUsageUpdater {
return &updater{}
}
type updater struct {
store objectstore.ObjectStore
spaceService space.Service
ctx context.Context
cancel context.CancelFunc
msgBatch *mb.MB[message]
}
func (u *updater) Name() string {
return CName
}
func (u *updater) Init(a *app.App) error {
u.store = app.MustComponent[objectstore.ObjectStore](a)
u.spaceService = app.MustComponent[space.Service](a)
u.msgBatch = mb.New[message](0)
return nil
}
func (u *updater) Run(context.Context) error {
u.ctx, u.cancel = context.WithCancel(context.Background())
go u.lastUsedUpdateHandler()
return nil
}
func (u *updater) Close(context.Context) error {
if u.cancel != nil {
u.cancel()
}
if err := u.msgBatch.Close(); err != nil {
log.Error("failed to close message batch", zap.Error(err))
}
return nil
}
func (u *updater) UpdateLastUsedDate(spaceId string, key Key, ts int64) {
if err := u.msgBatch.Add(u.ctx, message{spaceId: spaceId, key: key, time: ts}); err != nil {
log.Error("failed to add last used date info to message batch", zap.Error(err), zap.String("key", key.String()))
}
}
func (u *updater) lastUsedUpdateHandler() {
var (
accumulator = make(map[string]map[Key]int64)
lock sync.Mutex
)
go func() {
for {
select {
case <-u.ctx.Done():
return
case <-time.After(updateInterval):
lock.Lock()
if len(accumulator) == 0 {
lock.Unlock()
continue
}
for spaceId, keys := range accumulator {
log.Debug("updating lastUsedDate for objects in space", zap.Int("objects num", len(keys)), zap.String("spaceId", spaceId))
u.updateLastUsedDateForKeysInSpace(spaceId, keys)
}
accumulator = make(map[string]map[Key]int64)
lock.Unlock()
}
}
}()
for {
msgs, err := u.msgBatch.Wait(u.ctx)
if err != nil {
return
}
lock.Lock()
for _, msg := range msgs {
if keys := accumulator[msg.spaceId]; keys != nil {
keys[msg.key] = msg.time
} else {
keys = map[Key]int64{
msg.key: msg.time,
}
accumulator[msg.spaceId] = keys
}
}
lock.Unlock()
}
}
func (u *updater) updateLastUsedDateForKeysInSpace(spaceId string, keys map[Key]int64) {
spc, err := u.spaceService.Get(u.ctx, spaceId)
if err != nil {
log.Error("failed to get space", zap.String("spaceId", spaceId), zap.Error(err))
return
}
for key, timeStamp := range keys {
if err = u.updateLastUsedDate(spc, key, timeStamp); err != nil {
log.Error("failed to update last used date", zap.String("spaceId", spaceId), zap.String("key", key.String()), zap.Error(err))
}
}
}
func (u *updater) updateLastUsedDate(spc clientspace.Space, key Key, ts int64) error {
uk, err := domain.UnmarshalUniqueKey(key.URL())
if err != nil {
return fmt.Errorf("failed to unmarshall key: %w", err)
}
if uk.SmartblockType() != coresb.SmartBlockTypeObjectType && uk.SmartblockType() != coresb.SmartBlockTypeRelation {
return fmt.Errorf("cannot update lastUsedDate for object with invalid smartBlock type. Only object types and relations are expected")
}
details, err := u.store.SpaceIndex(spc.Id()).GetObjectByUniqueKey(uk)
if err != nil {
return fmt.Errorf("failed to get details: %w", err)
}
id := details.GetString(bundle.RelationKeyId)
if id == "" {
return fmt.Errorf("failed to get id from details: %w", err)
}
if err = spc.DoCtx(u.ctx, id, func(sb smartblock.SmartBlock) error {
st := sb.NewState()
st.SetLocalDetail(bundle.RelationKeyLastUsedDate, domain.Int64(ts))
return sb.Apply(st)
}); err != nil {
return fmt.Errorf("failed to set lastUsedDate to object: %w", err)
}
return nil
}
const maxInstallationTime = 5 * time.Minute
func SetLastUsedDateForInitialObjectType(id string, details *domain.Details) {
if !strings.HasPrefix(id, addr.BundledObjectTypeURLPrefix) || details == nil {

View file

@ -1,22 +1,13 @@
package lastused
import (
"context"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock/smarttest"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/space/clientspace"
"github.com/anyproto/anytype-heart/space/clientspace/mock_clientspace"
)
func TestSetLastUsedDateForInitialType(t *testing.T) {
@ -56,82 +47,3 @@ func TestSetLastUsedDateForInitialType(t *testing.T) {
assert.True(t, isLastUsedDateGreater(detailMap[bundle.TypeKeyCollection.BundledURL()], detailMap[bundle.TypeKeyDiaryEntry.BundledURL()]))
})
}
func TestUpdateLastUsedDate(t *testing.T) {
const spaceId = "space"
ts := time.Now().Unix()
isLastUsedDateRecent := func(details *domain.Details, deltaSeconds int64) bool {
return details.GetInt64(bundle.RelationKeyLastUsedDate)+deltaSeconds > time.Now().Unix()
}
store := objectstore.NewStoreFixture(t)
store.AddObjects(t, spaceId, []objectstore.TestObject{
{
bundle.RelationKeyId: domain.String(bundle.RelationKeyCamera.URL()),
bundle.RelationKeySpaceId: domain.String(spaceId),
bundle.RelationKeyUniqueKey: domain.String(bundle.RelationKeyCamera.URL()),
},
{
bundle.RelationKeyId: domain.String(bundle.TypeKeyDiaryEntry.URL()),
bundle.RelationKeySpaceId: domain.String(spaceId),
bundle.RelationKeyUniqueKey: domain.String(bundle.TypeKeyDiaryEntry.URL()),
},
{
bundle.RelationKeyId: domain.String("rel-custom"),
bundle.RelationKeySpaceId: domain.String(spaceId),
bundle.RelationKeyUniqueKey: domain.String("rel-custom"),
},
{
bundle.RelationKeyId: domain.String("opt-done"),
bundle.RelationKeySpaceId: domain.String(spaceId),
bundle.RelationKeyUniqueKey: domain.String("opt-done"),
},
})
u := updater{store: store}
getSpace := func() clientspace.Space {
spc := mock_clientspace.NewMockSpace(t)
spc.EXPECT().Id().Return(spaceId)
spc.EXPECT().DoCtx(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, id string, apply func(smartblock.SmartBlock) error) error {
sb := smarttest.New(id)
err := apply(sb)
require.NoError(t, err)
assert.True(t, isLastUsedDateRecent(sb.LocalDetails(), 5))
return nil
})
return spc
}
for _, tc := range []struct {
name string
key Key
getSpace func() clientspace.Space
isErrorExpected bool
}{
{"built-in relation", bundle.RelationKeyCamera, getSpace, false},
{"built-in type", bundle.TypeKeyDiaryEntry, getSpace, false},
{"custom relation", domain.RelationKey("custom"), getSpace, false},
{"option", domain.TypeKey("opt-done"), func() clientspace.Space {
spc := mock_clientspace.NewMockSpace(t)
return spc
}, true},
{"type that is not in store", bundle.TypeKeyAudio, func() clientspace.Space {
spc := mock_clientspace.NewMockSpace(t)
spc.EXPECT().Id().Return(spaceId)
return spc
}, true},
} {
t.Run("update lastUsedDate of "+tc.name, func(t *testing.T) {
err := u.updateLastUsedDate(tc.getSpace(), tc.key, ts)
if tc.isErrorExpected {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

View file

@ -1,258 +0,0 @@
// Code generated by mockery. DO NOT EDIT.
package mock_lastused
import (
context "context"
app "github.com/anyproto/any-sync/app"
lastused "github.com/anyproto/anytype-heart/core/block/editor/lastused"
mock "github.com/stretchr/testify/mock"
)
// MockObjectUsageUpdater is an autogenerated mock type for the ObjectUsageUpdater type
type MockObjectUsageUpdater struct {
mock.Mock
}
type MockObjectUsageUpdater_Expecter struct {
mock *mock.Mock
}
func (_m *MockObjectUsageUpdater) EXPECT() *MockObjectUsageUpdater_Expecter {
return &MockObjectUsageUpdater_Expecter{mock: &_m.Mock}
}
// Close provides a mock function with given fields: ctx
func (_m *MockObjectUsageUpdater) Close(ctx context.Context) error {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for Close")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockObjectUsageUpdater_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockObjectUsageUpdater_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockObjectUsageUpdater_Expecter) Close(ctx interface{}) *MockObjectUsageUpdater_Close_Call {
return &MockObjectUsageUpdater_Close_Call{Call: _e.mock.On("Close", ctx)}
}
func (_c *MockObjectUsageUpdater_Close_Call) Run(run func(ctx context.Context)) *MockObjectUsageUpdater_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockObjectUsageUpdater_Close_Call) Return(err error) *MockObjectUsageUpdater_Close_Call {
_c.Call.Return(err)
return _c
}
func (_c *MockObjectUsageUpdater_Close_Call) RunAndReturn(run func(context.Context) error) *MockObjectUsageUpdater_Close_Call {
_c.Call.Return(run)
return _c
}
// Init provides a mock function with given fields: a
func (_m *MockObjectUsageUpdater) Init(a *app.App) error {
ret := _m.Called(a)
if len(ret) == 0 {
panic("no return value specified for Init")
}
var r0 error
if rf, ok := ret.Get(0).(func(*app.App) error); ok {
r0 = rf(a)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockObjectUsageUpdater_Init_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Init'
type MockObjectUsageUpdater_Init_Call struct {
*mock.Call
}
// Init is a helper method to define mock.On call
// - a *app.App
func (_e *MockObjectUsageUpdater_Expecter) Init(a interface{}) *MockObjectUsageUpdater_Init_Call {
return &MockObjectUsageUpdater_Init_Call{Call: _e.mock.On("Init", a)}
}
func (_c *MockObjectUsageUpdater_Init_Call) Run(run func(a *app.App)) *MockObjectUsageUpdater_Init_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*app.App))
})
return _c
}
func (_c *MockObjectUsageUpdater_Init_Call) Return(err error) *MockObjectUsageUpdater_Init_Call {
_c.Call.Return(err)
return _c
}
func (_c *MockObjectUsageUpdater_Init_Call) RunAndReturn(run func(*app.App) error) *MockObjectUsageUpdater_Init_Call {
_c.Call.Return(run)
return _c
}
// Name provides a mock function with given fields:
func (_m *MockObjectUsageUpdater) Name() string {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Name")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockObjectUsageUpdater_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name'
type MockObjectUsageUpdater_Name_Call struct {
*mock.Call
}
// Name is a helper method to define mock.On call
func (_e *MockObjectUsageUpdater_Expecter) Name() *MockObjectUsageUpdater_Name_Call {
return &MockObjectUsageUpdater_Name_Call{Call: _e.mock.On("Name")}
}
func (_c *MockObjectUsageUpdater_Name_Call) Run(run func()) *MockObjectUsageUpdater_Name_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockObjectUsageUpdater_Name_Call) Return(name string) *MockObjectUsageUpdater_Name_Call {
_c.Call.Return(name)
return _c
}
func (_c *MockObjectUsageUpdater_Name_Call) RunAndReturn(run func() string) *MockObjectUsageUpdater_Name_Call {
_c.Call.Return(run)
return _c
}
// Run provides a mock function with given fields: ctx
func (_m *MockObjectUsageUpdater) Run(ctx context.Context) error {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for Run")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockObjectUsageUpdater_Run_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Run'
type MockObjectUsageUpdater_Run_Call struct {
*mock.Call
}
// Run is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockObjectUsageUpdater_Expecter) Run(ctx interface{}) *MockObjectUsageUpdater_Run_Call {
return &MockObjectUsageUpdater_Run_Call{Call: _e.mock.On("Run", ctx)}
}
func (_c *MockObjectUsageUpdater_Run_Call) Run(run func(ctx context.Context)) *MockObjectUsageUpdater_Run_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockObjectUsageUpdater_Run_Call) Return(err error) *MockObjectUsageUpdater_Run_Call {
_c.Call.Return(err)
return _c
}
func (_c *MockObjectUsageUpdater_Run_Call) RunAndReturn(run func(context.Context) error) *MockObjectUsageUpdater_Run_Call {
_c.Call.Return(run)
return _c
}
// UpdateLastUsedDate provides a mock function with given fields: spaceId, key, timeStamp
func (_m *MockObjectUsageUpdater) UpdateLastUsedDate(spaceId string, key lastused.Key, timeStamp int64) {
_m.Called(spaceId, key, timeStamp)
}
// MockObjectUsageUpdater_UpdateLastUsedDate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateLastUsedDate'
type MockObjectUsageUpdater_UpdateLastUsedDate_Call struct {
*mock.Call
}
// UpdateLastUsedDate is a helper method to define mock.On call
// - spaceId string
// - key lastused.Key
// - timeStamp int64
func (_e *MockObjectUsageUpdater_Expecter) UpdateLastUsedDate(spaceId interface{}, key interface{}, timeStamp interface{}) *MockObjectUsageUpdater_UpdateLastUsedDate_Call {
return &MockObjectUsageUpdater_UpdateLastUsedDate_Call{Call: _e.mock.On("UpdateLastUsedDate", spaceId, key, timeStamp)}
}
func (_c *MockObjectUsageUpdater_UpdateLastUsedDate_Call) Run(run func(spaceId string, key lastused.Key, timeStamp int64)) *MockObjectUsageUpdater_UpdateLastUsedDate_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(lastused.Key), args[2].(int64))
})
return _c
}
func (_c *MockObjectUsageUpdater_UpdateLastUsedDate_Call) Return() *MockObjectUsageUpdater_UpdateLastUsedDate_Call {
_c.Call.Return()
return _c
}
func (_c *MockObjectUsageUpdater_UpdateLastUsedDate_Call) RunAndReturn(run func(string, lastused.Key, int64)) *MockObjectUsageUpdater_UpdateLastUsedDate_Call {
_c.Call.Return(run)
return _c
}
// NewMockObjectUsageUpdater creates a new instance of MockObjectUsageUpdater. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockObjectUsageUpdater(t interface {
mock.TestingT
Cleanup(func())
}) *MockObjectUsageUpdater {
mock := &MockObjectUsageUpdater{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -59,7 +59,7 @@ func (f *ObjectFactory) newPage(spaceId string, sb smartblock.SmartBlock) *Page
return &Page{
SmartBlock: sb,
ChangeReceiver: sb.(source.ChangeReceiver),
AllOperations: basic.NewBasic(sb, store, f.layoutConverter, f.fileObjectService, f.lastUsedUpdater),
AllOperations: basic.NewBasic(sb, store, f.layoutConverter, f.fileObjectService),
IHistory: basic.NewHistory(sb),
Text: stext.NewText(
sb,

View file

@ -30,7 +30,7 @@ type participant struct {
}
func (f *ObjectFactory) newParticipant(spaceId string, sb smartblock.SmartBlock, spaceIndex spaceindex.Store) *participant {
basicComponent := basic.NewBasic(sb, spaceIndex, f.layoutConverter, nil, f.lastUsedUpdater)
basicComponent := basic.NewBasic(sb, spaceIndex, f.layoutConverter, nil)
return &participant{
SmartBlock: sb,
DetailsUpdatable: basicComponent,

View file

@ -120,7 +120,7 @@ func TestParticipant_Init(t *testing.T) {
bundle.RelationKeyName: domain.String("test"),
}})
basicComponent := basic.NewBasic(sb, store, nil, nil, nil)
basicComponent := basic.NewBasic(sb, store, nil, nil)
p := &participant{
SmartBlock: sb,
DetailsUpdatable: basicComponent,
@ -147,7 +147,7 @@ func TestParticipant_Init(t *testing.T) {
sb := smarttest.New("root")
store := newStoreFixture(t)
basicComponent := basic.NewBasic(sb, store, nil, nil, nil)
basicComponent := basic.NewBasic(sb, store, nil, nil)
p := &participant{
SmartBlock: sb,
DetailsUpdatable: basicComponent,
@ -196,7 +196,7 @@ func newStoreFixture(t *testing.T) *spaceindex.StoreFixture {
func newParticipantTest(t *testing.T) (*participant, error) {
sb := smarttest.New("root")
store := newStoreFixture(t)
basicComponent := basic.NewBasic(sb, store, nil, nil, nil)
basicComponent := basic.NewBasic(sb, store, nil, nil)
p := &participant{
SmartBlock: sb,
DetailsUpdatable: basicComponent,

View file

@ -40,7 +40,7 @@ func (f *ObjectFactory) newProfile(spaceId string, sb smartblock.SmartBlock) *Pr
fileComponent := file.NewFile(sb, f.fileBlockService, f.picker, f.processService, f.fileUploaderService)
return &Profile{
SmartBlock: sb,
AllOperations: basic.NewBasic(sb, store, f.layoutConverter, f.fileObjectService, f.lastUsedUpdater),
AllOperations: basic.NewBasic(sb, store, f.layoutConverter, f.fileObjectService),
IHistory: basic.NewHistory(sb),
Text: stext.NewText(
sb,

View file

@ -262,13 +262,6 @@ func (st *SmartTest) SetDetails(ctx session.Context, details []domain.Detail, sh
return
}
func (st *SmartTest) SetDetailsAndUpdateLastUsed(ctx session.Context, details []domain.Detail, showEvent bool) (err error) {
for _, detail := range details {
st.Results.LastUsedUpdates = append(st.Results.LastUsedUpdates, string(detail.Key))
}
return st.SetDetails(ctx, details, showEvent)
}
func (st *SmartTest) UpdateDetails(ctx session.Context, update func(current *domain.Details) (*domain.Details, error)) (err error) {
details := st.Doc.(*state.State).CombinedDetails()
if details == nil {
@ -282,31 +275,6 @@ func (st *SmartTest) UpdateDetails(ctx session.Context, update func(current *dom
return nil
}
func (st *SmartTest) UpdateDetailsAndLastUsed(ctx session.Context, update func(current *domain.Details) (*domain.Details, error)) (err error) {
details := st.Doc.(*state.State).CombinedDetails()
if details == nil {
details = domain.NewDetails()
}
oldDetails := details.Copy()
newDetails, err := update(details)
if err != nil {
return err
}
diff, _ := domain.StructDiff(oldDetails, newDetails)
if diff == nil {
return nil
}
st.Doc.(*state.State).SetDetails(newDetails)
for k, _ := range diff.Iterate() {
st.Results.LastUsedUpdates = append(st.Results.LastUsedUpdates, string(k))
}
return nil
}
func (st *SmartTest) Init(ctx *smartblock.InitContext) (err error) {
if ctx.State == nil {
ctx.State = st.NewState()
@ -453,6 +421,4 @@ func (st *SmartTest) Update(ctx session.Context, apply func(b simple.Block) erro
type Results struct {
Events [][]simple.EventMessage
Applies [][]*model.Block
LastUsedUpdates []string
}

View file

@ -29,7 +29,7 @@ func NewWidgetObject(
objectStore spaceindex.Store,
layoutConverter converter.LayoutConverter,
) *WidgetObject {
bs := basic.NewBasic(sb, objectStore, layoutConverter, nil, nil)
bs := basic.NewBasic(sb, objectStore, layoutConverter, nil)
return &WidgetObject{
SmartBlock: sb,
Movable: bs,

View file

@ -36,7 +36,7 @@ type Workspaces struct {
func (f *ObjectFactory) newWorkspace(sb smartblock.SmartBlock, store spaceindex.Store) *Workspaces {
w := &Workspaces{
SmartBlock: sb,
AllOperations: basic.NewBasic(sb, store, f.layoutConverter, f.fileObjectService, f.lastUsedUpdater),
AllOperations: basic.NewBasic(sb, store, f.layoutConverter, f.fileObjectService),
IHistory: basic.NewHistory(sb),
Text: stext.NewText(
sb,

View file

@ -8,7 +8,6 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/pkg/errors"
"github.com/anyproto/anytype-heart/core/block/editor/lastused"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/restriction"
"github.com/anyproto/anytype-heart/core/block/source"
@ -65,7 +64,6 @@ type service struct {
bookmarkService bookmarkService
spaceService space.Service
templateService templateService
lastUsedUpdater lastused.ObjectUsageUpdater
archiver objectArchiver
}
@ -79,7 +77,6 @@ func (s *service) Init(a *app.App) (err error) {
s.collectionService = app.MustComponent[collectionService](a)
s.spaceService = app.MustComponent[space.Service](a)
s.templateService = app.MustComponent[templateService](a)
s.lastUsedUpdater = app.MustComponent[lastused.ObjectUsageUpdater](a)
s.archiver = app.MustComponent[objectArchiver](a)
return nil
}

View file

@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/anyproto/anytype-heart/core/block/editor/lastused/mock_lastused"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock/smarttest"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/domain"
@ -26,7 +25,6 @@ type fixture struct {
spaceService *mock_space.MockService
spc *mock_clientspace.MockSpace
templateService *testTemplateService
lastUsedService *mock_lastused.MockObjectUsageUpdater
service Service
}
@ -35,19 +33,16 @@ func newFixture(t *testing.T) *fixture {
spc := mock_clientspace.NewMockSpace(t)
templateSvc := &testTemplateService{}
lastUsedSvc := mock_lastused.NewMockObjectUsageUpdater(t)
s := &service{
spaceService: spaceService,
templateService: templateSvc,
lastUsedUpdater: lastUsedSvc,
}
return &fixture{
spaceService: spaceService,
spc: spc,
templateService: templateSvc,
lastUsedService: lastUsedSvc,
service: s,
}
}
@ -77,7 +72,6 @@ func TestService_CreateObject(t *testing.T) {
f.spaceService.EXPECT().Get(mock.Anything, mock.Anything).Return(f.spc, nil)
f.spc.EXPECT().CreateTreeObject(mock.Anything, mock.Anything).Return(sb, nil)
f.spc.EXPECT().Id().Return(spaceId)
f.lastUsedService.EXPECT().UpdateLastUsedDate(spaceId, bundle.TypeKeyTemplate, mock.Anything).Return()
// when
id, _, err := f.service.CreateObject(context.Background(), spaceId, CreateObjectRequest{

View file

@ -2,7 +2,6 @@ package objectcreator
import (
"context"
"strings"
"time"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
@ -15,8 +14,6 @@ import (
"github.com/anyproto/anytype-heart/metrics"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
coresb "github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/addr"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/space/clientspace"
)
@ -100,8 +97,6 @@ func (s *service) CreateSmartBlockFromStateInSpaceWithOptions(
sb.Unlock()
id = sb.Id()
s.updateLastUsedDate(spc.Id(), sbType, newDetails, objectTypeKeys[0])
ev.SmartblockCreateMs = time.Since(startTime).Milliseconds() - ev.SetDetailsMs - ev.WorkspaceCreateMs - ev.GetWorkspaceBlockWaitMs
ev.SmartblockType = int(sbType)
ev.ObjectId = id
@ -109,24 +104,6 @@ func (s *service) CreateSmartBlockFromStateInSpaceWithOptions(
return id, newDetails, nil
}
func (s *service) updateLastUsedDate(spaceId string, sbType coresb.SmartBlockType, details *domain.Details, typeKey domain.TypeKey) {
if details.GetInt64(bundle.RelationKeyLastUsedDate) != 0 {
return
}
uk := details.GetString(bundle.RelationKeyUniqueKey)
ts := time.Now().Unix()
switch sbType {
case coresb.SmartBlockTypeObjectType:
s.lastUsedUpdater.UpdateLastUsedDate(spaceId, domain.TypeKey(strings.TrimPrefix(uk, addr.ObjectTypeKeyToIdPrefix)), ts)
case coresb.SmartBlockTypeRelation:
s.lastUsedUpdater.UpdateLastUsedDate(spaceId, domain.RelationKey(strings.TrimPrefix(uk, addr.RelationKeyToIdPrefix)), ts)
default:
if details.GetInt64(bundle.RelationKeyOrigin) == int64(model.ObjectOrigin_none) {
s.lastUsedUpdater.UpdateLastUsedDate(spaceId, typeKey, ts)
}
}
}
func objectTypeKeysToSmartBlockType(typeKeys []domain.TypeKey) coresb.SmartBlockType {
// TODO Add validation for types that user can't create

View file

@ -1,11 +1,12 @@
package process
import (
"context"
"errors"
"sync"
"sync/atomic"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"github.com/globalsign/mgo/bson"
"github.com/anyproto/anytype-heart/pb"
@ -49,7 +50,7 @@ func (s *service) NewQueue(info pb.ModelProcess, workers int, noProgress bool, n
id: info.Id,
info: info,
state: pb.ModelProcess_None,
msgs: mb.New(0),
msgs: mb.New[Task](0),
done: make(chan struct{}),
cancel: make(chan struct{}),
s: s,
@ -66,7 +67,7 @@ type queue struct {
id string
info pb.ModelProcess
state pb.ModelProcessState
msgs *mb.MB
msgs *mb.MB[Task]
wg *sync.WaitGroup
done, cancel chan struct{}
pTotal, pDone int64
@ -106,7 +107,7 @@ func (p *queue) Add(ts ...Task) (err error) {
return
}
for _, t := range ts {
if err = p.msgs.Add(t); err != nil {
if err = p.msgs.Add(context.Background(), t); err != nil {
return ErrQueueDone
}
atomic.AddInt64(&p.pTotal, 1)
@ -123,7 +124,7 @@ func (p *queue) Wait(ts ...Task) (err error) {
p.m.Unlock()
var done = make(chan struct{}, len(ts))
for _, t := range ts {
if err = p.msgs.Add(taskFunction(t, done)); err != nil {
if err = p.msgs.Add(context.Background(), taskFunction(t, done)); err != nil {
return ErrQueueDone
}
atomic.AddInt64(&p.pTotal, 1)
@ -267,15 +268,15 @@ func (p *queue) checkRunning(checkStarted bool) (err error) {
func (p *queue) worker() {
defer p.wg.Done()
for {
msgs := p.msgs.WaitMax(1)
msgs, err := p.msgs.NewCond().WithMax(1).Wait(context.Background())
if err != nil {
log.Errorf("failed wait: %v", err)
return
}
if len(msgs) == 0 {
return
}
if f, ok := msgs[0].(func()); ok {
f()
} else if t, ok := msgs[0].(Task); ok {
t()
}
msgs[0]()
atomic.AddInt64(&p.pDone, 1)
}
}

View file

@ -25,7 +25,7 @@ func (mw *Middleware) ObjectSetDetails(cctx context.Context, req *pb.RpcObjectSe
return m
}
err := mustService[detailservice.Service](mw).SetDetailsAndUpdateLastUsed(ctx, req.ContextId, requestDetailsListToDomain(req.GetDetails()))
err := mustService[detailservice.Service](mw).SetDetails(ctx, req.ContextId, requestDetailsListToDomain(req.GetDetails()))
if err != nil {
return response(pb.RpcObjectSetDetailsResponseError_UNKNOWN_ERROR, err)
}

View file

@ -290,7 +290,11 @@ func (v Value) TryWrapToStringList() ([]string, bool) {
}
s, ok := v.TryString()
if ok {
return []string{s}, true
if s == "" {
return []string{}, true
} else {
return []string{s}, true
}
}
return nil, false
}

View file

@ -244,3 +244,43 @@ func TestValue_Empty(t *testing.T) {
assert.Equal(t, tc.want, tc.value.IsEmpty())
}
}
func TestTryWrapToStringList(t *testing.T) {
for i, tc := range []struct {
in Value
want []string
wantOk bool
}{
{
in: String(""),
want: []string{},
wantOk: true,
},
{
in: String("foo"),
want: []string{"foo"},
wantOk: true,
},
{
in: StringList([]string{"foo", "bar"}),
want: []string{"foo", "bar"},
wantOk: true,
},
{
in: Float64(123.456),
want: nil,
wantOk: false,
},
{
in: Invalid(),
want: nil,
wantOk: false,
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
got, gotOk := tc.in.TryWrapToStringList()
assert.Equal(t, tc.want, got)
assert.Equal(t, tc.wantOk, gotOk)
})
}
}

View file

@ -37,7 +37,14 @@ var relationsWhiteList = append(slices.Clone(derivedObjectsWhiteList), bundle.Re
var relationOptionWhiteList = append(slices.Clone(derivedObjectsWhiteList), bundle.RelationKeyRelationOptionColor.String())
var fileRelationsWhiteList = append(slices.Clone(documentRelationsWhiteList), bundle.RelationKeyFileId.String(), bundle.RelationKeyFileExt.String())
var fileRelationsWhiteList = append(
slices.Clone(documentRelationsWhiteList),
bundle.RelationKeyFileId.String(),
bundle.RelationKeyFileExt.String(),
bundle.RelationKeyFileMimeType.String(),
bundle.RelationKeySizeInBytes.String(),
bundle.RelationKeySource.String(),
)
var publishingRelationsWhiteList = map[model.ObjectTypeLayout][]string{
model.ObjectType_basic: documentRelationsWhiteList,

View file

@ -17,6 +17,7 @@ import (
"github.com/anyproto/anytype-publish-server/publishclient/publishapi"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"go.uber.org/zap"
"golang.org/x/exp/slices"
@ -25,9 +26,11 @@ import (
"github.com/anyproto/anytype-heart/core/inviteservice"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/space"
"github.com/anyproto/anytype-heart/space/clientspace"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
var (
@ -88,6 +91,7 @@ type service struct {
publishClientService publishclient.Client
identityService identity.Service
inviteService inviteservice.InviteService
objectStore objectstore.ObjectStore
}
func New() Service {
@ -100,6 +104,7 @@ func (s *service) Init(a *app.App) error {
s.publishClientService = app.MustComponent[publishclient.Client](a)
s.identityService = app.MustComponent[identity.Service](a)
s.inviteService = app.MustComponent[inviteservice.InviteService](a)
s.objectStore = app.MustComponent[objectstore.ObjectStore](a)
return nil
}
@ -189,8 +194,16 @@ func (s *service) publishToPublishServer(ctx context.Context, spaceId, pageId, u
return err
}
if err := s.publishToServer(ctx, spaceId, pageId, uri, version, tempPublishDir); err != nil {
return err
if localPublishDir := os.Getenv("ANYTYPE_LOCAL_PUBLISH_DIR"); localPublishDir != "" {
err := os.CopyFS(localPublishDir, os.DirFS(tempPublishDir))
if err != nil {
log.Error("publishing to local dir error", zap.Error(err))
return err
}
} else {
if err := s.publishToServer(ctx, spaceId, pageId, uri, version, tempPublishDir); err != nil {
return err
}
}
return nil
@ -284,11 +297,15 @@ func (s *service) processSnapshotFile(exportPath, dirName string, file fs.DirEnt
return err
}
details := snapshot.GetSnapshot().GetData().GetDetails()
if source := pbtypes.GetString(details, bundle.RelationKeySource.String()); source != "" {
source = filepath.ToSlash(source)
details.Fields[bundle.RelationKeySource.String()] = pbtypes.String(source)
}
jsonData, err := jsonM.MarshalToString(&snapshot)
if err != nil {
return err
}
fileNameKey := fmt.Sprintf("%s/%s", dirName, file.Name())
uberSnapshot.PbFiles[fileNameKey] = jsonData
return nil
@ -431,6 +448,7 @@ func (s *service) PublishList(ctx context.Context, spaceId string) ([]*pb.RpcPub
pbPublishes := make([]*pb.RpcPublishingPublishState, 0, len(publishes))
for _, publish := range publishes {
version := s.retrieveVersion(publish)
details := s.retrieveObjectDetails(publish)
pbPublishes = append(pbPublishes, &pb.RpcPublishingPublishState{
SpaceId: publish.SpaceId,
ObjectId: publish.ObjectId,
@ -440,11 +458,26 @@ func (s *service) PublishList(ctx context.Context, spaceId string) ([]*pb.RpcPub
Timestamp: publish.Timestamp,
Size_: publish.Size_,
JoinSpace: version.JoinSpace,
Details: details,
})
}
return pbPublishes, nil
}
func (s *service) retrieveObjectDetails(publish *publishapi.Publish) *types.Struct {
records, err := s.objectStore.SpaceIndex(publish.SpaceId).QueryByIds([]string{publish.ObjectId})
if err != nil {
log.Error("failed to extract object details", zap.Error(err))
return nil
}
if len(records) == 0 {
log.Error("details weren't found in store")
return nil
}
details := records[0].Details
return details.ToProto()
}
func (s *service) retrieveVersion(publish *publishapi.Publish) *Version {
version := &Version{}
err := json.Unmarshal([]byte(publish.Version), version)

View file

@ -18,6 +18,7 @@ import (
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/anytype-publish-server/publishclient/publishapi"
"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"github.com/anyproto/anytype-heart/core/anytype/account/mock_account"
@ -47,12 +48,14 @@ import (
"github.com/anyproto/anytype-heart/space/mock_space"
"github.com/anyproto/anytype-heart/space/spacecore/typeprovider/mock_typeprovider"
"github.com/anyproto/anytype-heart/tests/testutil"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
const (
spaceId = "spaceId"
objectId = "objectId"
id = "identity"
spaceId = "spaceId"
objectId = "objectId"
id = "identity"
objectName = "test"
)
type mockPublishClient struct {
@ -453,41 +456,6 @@ func TestPublish(t *testing.T) {
})
}
func TestService_PublishList(t *testing.T) {
t.Run("success", func(t *testing.T) {
// given
publishClientService := &mockPublishClient{
t: t,
expectedResult: []*publishapi.Publish{
{
SpaceId: spaceId,
ObjectId: objectId,
Uri: "test",
Version: "{\"heads\":[\"heads\"],\"joinSpace\":true}",
},
},
}
svc := &service{
publishClientService: publishClientService,
}
// when
publishes, err := svc.PublishList(context.Background(), spaceId)
// then
expectedModel := &pb.RpcPublishingPublishState{
SpaceId: spaceId,
ObjectId: objectId,
Uri: "test",
Version: "{\"heads\":[\"heads\"],\"joinSpace\":true}",
JoinSpace: true,
}
assert.NoError(t, err)
assert.Len(t, publishes, 1)
assert.Equal(t, expectedModel, publishes[0])
})
}
func TestService_GetStatus(t *testing.T) {
t.Run("success", func(t *testing.T) {
// given
@ -522,6 +490,176 @@ func TestService_GetStatus(t *testing.T) {
})
}
func TestService_PublishingList(t *testing.T) {
t.Run("success", func(t *testing.T) {
// given
publishClientService := &mockPublishClient{
t: t,
expectedResult: []*publishapi.Publish{
{
SpaceId: spaceId,
ObjectId: objectId,
Uri: objectName,
Version: "{\"heads\":[\"heads\"],\"joinSpace\":true}",
},
},
}
svc := &service{
objectStore: objectstore.NewStoreFixture(t),
publishClientService: publishClientService,
}
// when
publishes, err := svc.PublishList(context.Background(), spaceId)
// then
expectedModel := &pb.RpcPublishingPublishState{
SpaceId: spaceId,
ObjectId: objectId,
Uri: objectName,
Version: "{\"heads\":[\"heads\"],\"joinSpace\":true}",
JoinSpace: true,
}
assert.NoError(t, err)
assert.Len(t, publishes, 1)
assert.Equal(t, expectedModel, publishes[0])
})
space1Id := "spaceId1"
object1Id := "objectId1"
name1 := "test1"
t.Run("extract from all spaces", func(t *testing.T) {
// given
publishClientService := &mockPublishClient{
t: t,
expectedResult: []*publishapi.Publish{
{
SpaceId: spaceId,
ObjectId: objectId,
Uri: objectName,
Version: "{\"heads\":[\"heads\"],\"joinSpace\":false}",
},
{
SpaceId: space1Id,
ObjectId: object1Id,
Uri: name1,
Version: "{\"heads\":[\"heads1\"],\"joinSpace\":false}",
},
},
}
storeFixture := objectstore.NewStoreFixture(t)
storeFixture.AddObjects(t, spaceId, []objectstore.TestObject{
{
bundle.RelationKeyId: domain.String(objectId),
bundle.RelationKeyName: domain.String(objectName),
},
})
storeFixture.AddObjects(t, space1Id, []objectstore.TestObject{
{
bundle.RelationKeyId: domain.String(object1Id),
bundle.RelationKeyName: domain.String(name1),
},
})
svc := &service{
publishClientService: publishClientService,
objectStore: storeFixture,
}
// when
publish, err := svc.PublishList(context.Background(), "")
// then
expectedModel := []*pb.RpcPublishingPublishState{
{
SpaceId: spaceId,
ObjectId: objectId,
Uri: objectName,
Version: "{\"heads\":[\"heads\"],\"joinSpace\":false}",
JoinSpace: false,
Details: &types.Struct{Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String(objectId),
bundle.RelationKeyName.String(): pbtypes.String(objectName),
}},
},
{
SpaceId: space1Id,
ObjectId: object1Id,
Uri: name1,
Version: "{\"heads\":[\"heads1\"],\"joinSpace\":false}",
JoinSpace: false,
Details: &types.Struct{Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String(object1Id),
bundle.RelationKeyName.String(): pbtypes.String(name1),
}},
},
}
assert.NoError(t, err)
assert.Equal(t, expectedModel, publish)
})
t.Run("details empty", func(t *testing.T) {
// given
publishClientService := &mockPublishClient{
t: t,
expectedResult: []*publishapi.Publish{
{
SpaceId: spaceId,
ObjectId: objectId,
Uri: objectName,
Version: "{\"heads\":[\"heads\"],\"joinSpace\":false}",
},
{
SpaceId: space1Id,
ObjectId: object1Id,
Uri: name1,
Version: "{\"heads\":[\"heads1\"],\"joinSpace\":false}",
},
},
}
storeFixture := objectstore.NewStoreFixture(t)
storeFixture.AddObjects(t, spaceId, []objectstore.TestObject{
{
bundle.RelationKeyId: domain.String(objectId),
bundle.RelationKeyName: domain.String(objectName),
},
})
svc := &service{
publishClientService: publishClientService,
objectStore: storeFixture,
}
// when
publish, err := svc.PublishList(context.Background(), "")
// then
expectedModel := []*pb.RpcPublishingPublishState{
{
SpaceId: spaceId,
ObjectId: objectId,
Uri: objectName,
Version: "{\"heads\":[\"heads\"],\"joinSpace\":false}",
JoinSpace: false,
Details: &types.Struct{Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String(objectId),
bundle.RelationKeyName.String(): pbtypes.String(objectName),
}},
},
{
SpaceId: space1Id,
ObjectId: object1Id,
Uri: name1,
Version: "{\"heads\":[\"heads1\"],\"joinSpace\":false}",
JoinSpace: false,
},
}
assert.NoError(t, err)
assert.Equal(t, expectedModel, publish)
})
}
func prepaeSpaceService(t *testing.T, isPersonal bool) (*mock_space.MockService, error) {
spaceService := mock_space.NewMockService(t)
space := mock_clientspace.NewMockSpace(t)

View file

@ -1,60 +0,0 @@
package recordsbatcher
import (
"time"
"github.com/anyproto/any-sync/app"
"github.com/cheggaaa/mb"
)
const CName = "recordsbatcher"
type recordsBatcher struct {
batcher *mb.MB
packDelay time.Duration // delay for better packing of msgs
}
func (r *recordsBatcher) Init(a *app.App) (err error) {
r.batcher = mb.New(0)
r.packDelay = time.Millisecond * 100
return nil
}
func (r *recordsBatcher) Name() (name string) {
return CName
}
func (r *recordsBatcher) Add(msgs ...interface{}) error {
return r.batcher.Add(msgs...)
}
func (r *recordsBatcher) Read(buffer []interface{}) int {
defer func() {
time.Sleep(r.packDelay)
}()
msgs := r.batcher.WaitMax(len(buffer))
if len(msgs) == 0 {
return 0
}
for i, msg := range msgs {
buffer[i] = msg
}
return len(msgs)
}
func (r *recordsBatcher) Close() (err error) {
return r.batcher.Close()
}
func New() RecordsBatcher {
return &recordsBatcher{batcher: mb.New(0)}
}
type RecordsBatcher interface {
// Read reads a batch into the buffer, returns number of records that were read. 0 means no more data will be available
Read(buffer []interface{}) int
Add(msgs ...interface{}) error
app.Component
}

View file

@ -1,11 +1,12 @@
package subscription
import (
"context"
"fmt"
"sync"
"github.com/anyproto/any-store/query"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -29,7 +30,7 @@ type collectionObserver struct {
cache *cache
objectStore spaceindex.Store
collectionService CollectionService
recBatch *mb.MB
recBatch *mb.MB[database.Record]
recBatchMutex sync.Mutex
spaceSubscription *spaceSubscriptions
@ -114,7 +115,7 @@ func (c *collectionObserver) updateIds(ids []string) {
c.lock.Unlock()
entries := c.spaceSubscription.fetchEntriesLocked(append(removed, added...))
for _, e := range entries {
err := c.recBatch.Add(database.Record{
err := c.recBatch.Add(context.Background(), database.Record{
Details: e.data,
})
if err != nil {

View file

@ -1,9 +1,10 @@
package subscription
import (
"context"
"testing"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"github.com/stretchr/testify/assert"
"github.com/anyproto/anytype-heart/core/domain"
@ -30,7 +31,7 @@ func Test_newCollectionObserver(t *testing.T) {
cache.Set(&entry{id: "id2", data: domain.NewDetailsFromMap(map[domain.RelationKey]domain.Value{
bundle.RelationKeyId: domain.String("id2"),
})})
batcher := mb.New(0)
batcher := mb.New[database.Record](0)
c := &spaceSubscriptions{
collectionService: collectionService,
objectStore: store,
@ -46,11 +47,12 @@ func Test_newCollectionObserver(t *testing.T) {
expectedIds := []string{"id1", "id2"}
ch <- expectedIds
close(observer.closeCh)
msgs := batcher.Wait()
msgs, err := batcher.NewCond().WithMin(2).Wait(context.Background())
assert.NoError(t, err)
var receivedIds []string
for _, msg := range msgs {
id := msg.(database.Record).Details.GetString(bundle.RelationKeyId)
id := msg.Details.GetString(bundle.RelationKeyId)
receivedIds = append(receivedIds, id)
}
assert.Equal(t, expectedIds, receivedIds)
@ -76,7 +78,7 @@ func Test_newCollectionObserver(t *testing.T) {
bundle.RelationKeySpaceId: domain.String(spaceId),
},
})
batcher := mb.New(0)
batcher := mb.New[database.Record](0)
c := &spaceSubscriptions{
collectionService: collectionService,
objectStore: store,
@ -92,11 +94,12 @@ func Test_newCollectionObserver(t *testing.T) {
expectedIds := []string{"id1", "id2"}
ch <- expectedIds
close(observer.closeCh)
msgs := batcher.Wait()
msgs, err := batcher.NewCond().WithMin(2).Wait(context.Background())
assert.NoError(t, err)
var receivedIds []string
for _, msg := range msgs {
id := msg.(database.Record).Details.GetString(bundle.RelationKeyId)
id := msg.Details.GetString(bundle.RelationKeyId)
receivedIds = append(receivedIds, id)
}
assert.Equal(t, expectedIds, receivedIds)

View file

@ -6,7 +6,7 @@ import (
"testing"
"time"
mb2 "github.com/cheggaaa/mb/v3"
"github.com/cheggaaa/mb/v3"
"github.com/stretchr/testify/require"
"github.com/anyproto/anytype-heart/core/domain"
@ -109,7 +109,7 @@ func TestInternalSubscriptionSingle(t *testing.T) {
require.NoError(t, err)
err = resp.Output.Add(context.Background(), event.NewMessage("", nil))
require.True(t, errors.Is(err, mb2.ErrClosed))
require.True(t, errors.Is(err, mb.ErrClosed))
})
t.Run("try to add after close", func(t *testing.T) {
@ -280,7 +280,7 @@ func TestInternalSubCustomQueue(t *testing.T) {
subId := "test"
fx := newFixtureWithRealObjectStore(t)
queue := mb2.New[*pb.EventMessage](0)
queue := mb.New[*pb.EventMessage](0)
resp, err := fx.Search(SubscribeRequest{
SpaceId: testSpaceId,

View file

@ -9,8 +9,7 @@ import (
"github.com/anyproto/any-store/anyenc"
"github.com/anyproto/any-sync/app"
"github.com/cheggaaa/mb"
mb2 "github.com/cheggaaa/mb/v3"
"github.com/cheggaaa/mb/v3"
"github.com/globalsign/mgo/bson"
"github.com/gogo/protobuf/types"
"golang.org/x/exp/slices"
@ -61,7 +60,7 @@ type SubscribeRequest struct {
// Internal indicates that subscription will send events into message queue instead of global client's event system
Internal bool
// InternalQueue is used when Internal flag is set to true. If it's nil, new queue will be created
InternalQueue *mb2.MB[*pb.EventMessage]
InternalQueue *mb.MB[*pb.EventMessage]
AsyncInit bool
}
@ -72,7 +71,7 @@ type SubscribeResponse struct {
Counters *pb.EventObjectSubscriptionCounters
// Used when Internal flag is set to true
Output *mb2.MB[*pb.EventMessage]
Output *mb.MB[*pb.EventMessage]
}
type Service interface {
@ -117,13 +116,13 @@ type service struct {
type internalSubOutput struct {
externallyManaged bool
queue *mb2.MB[*pb.EventMessage]
queue *mb.MB[*pb.EventMessage]
}
func newInternalSubOutput(queue *mb2.MB[*pb.EventMessage]) *internalSubOutput {
func newInternalSubOutput(queue *mb.MB[*pb.EventMessage]) *internalSubOutput {
if queue == nil {
return &internalSubOutput{
queue: mb2.New[*pb.EventMessage](0),
queue: mb.New[*pb.EventMessage](0),
}
}
return &internalSubOutput{
@ -276,7 +275,7 @@ func (s *service) getSpaceSubscriptions(spaceId string) (*spaceSubscriptions, er
subscriptionKeys: make([]string, 0, 20),
subscriptions: make(map[string]subscription, 20),
customOutput: map[string]*internalSubOutput{},
recBatch: mb.New(0),
recBatch: mb.New[database.Record](0),
objectStore: s.objectStore.SpaceIndex(spaceId),
kanban: s.kanban,
collectionService: s.collectionService,
@ -286,7 +285,7 @@ func (s *service) getSpaceSubscriptions(spaceId string) (*spaceSubscriptions, er
}
spaceSubs.ds = newDependencyService(spaceSubs)
spaceSubs.initDebugger()
err := spaceSubs.Run(context.Background())
err := spaceSubs.Run()
if err != nil {
return nil, fmt.Errorf("run space subscriptions: %w", err)
}
@ -300,7 +299,7 @@ type spaceSubscriptions struct {
subscriptions map[string]subscription
customOutput map[string]*internalSubOutput
recBatch *mb.MB
recBatch *mb.MB[database.Record]
// Deps
objectStore spaceindex.Store
@ -315,12 +314,19 @@ type spaceSubscriptions struct {
subDebugger *subDebugger
arenaPool *anyenc.ArenaPool
ctx context.Context
cancelCtx context.CancelFunc
}
func (s *spaceSubscriptions) Run(context.Context) (err error) {
func (s *spaceSubscriptions) Run() (err error) {
s.ctx, s.cancelCtx = context.WithCancel(context.Background())
var batchErr error
s.objectStore.SubscribeForAll(func(rec database.Record) {
s.recBatch.Add(rec)
batchErr = s.recBatch.Add(s.ctx, rec)
})
if batchErr != nil {
return batchErr
}
go s.recordsHandler()
return
}
@ -437,7 +443,7 @@ func (s *spaceSubscriptions) subscribeForQuery(req SubscribeRequest, f *database
}
}
var outputQueue *mb2.MB[*pb.EventMessage]
var outputQueue *mb.MB[*pb.EventMessage]
if req.Internal {
output := newInternalSubOutput(req.InternalQueue)
outputQueue = output.queue
@ -527,7 +533,7 @@ func (s *spaceSubscriptions) subscribeForCollection(req SubscribeRequest, f *dat
depRecords = sub.sortedSub.depSub.getActiveRecords()
}
var outputQueue *mb2.MB[*pb.EventMessage]
var outputQueue *mb.MB[*pb.EventMessage]
if req.Internal {
output := newInternalSubOutput(req.InternalQueue)
outputQueue = output.queue
@ -778,15 +784,18 @@ func (s *spaceSubscriptions) recordsHandler() {
}
}
for {
records := s.recBatch.Wait()
records, err := s.recBatch.Wait(s.ctx)
if err != nil {
return
}
if len(records) == 0 {
return
}
for _, rec := range records {
id := rec.(database.Record).Details.GetString(bundle.RelationKeyId)
id := rec.Details.GetString(bundle.RelationKeyId)
// nil previous version
nilIfExists(id)
entries = append(entries, newEntry(id, rec.(database.Record).Details))
entries = append(entries, newEntry(id, rec.Details))
}
// filter nil entries
filtered := entries[:0]
@ -851,7 +860,7 @@ func (s *spaceSubscriptions) onChangeWithinContext(entries []*entry, proc func(c
s.eventSender.Broadcast(&pb.Event{Messages: msgs})
} else {
err := s.customOutput[id].add(msgs...)
if err != nil && !errors.Is(err, mb2.ErrClosed) {
if err != nil && !errors.Is(err, mb.ErrClosed) {
log.With("subId", id, "error", err).Errorf("push to output")
}
}
@ -923,6 +932,9 @@ func (s *spaceSubscriptions) depIdsFromFilter(spaceId string, filters []database
func (s *spaceSubscriptions) Close(ctx context.Context) (err error) {
s.m.Lock()
defer s.m.Unlock()
if s.cancelCtx != nil {
s.cancelCtx()
}
s.recBatch.Close()
for subId, sub := range s.subscriptions {
sub.close()

View file

@ -18500,6 +18500,7 @@ Available undo/redo operations
| timestamp | [int64](#int64) | | |
| size | [int64](#int64) | | |
| joinSpace | [bool](#bool) | | |
| details | [google.protobuf.Struct](#google-protobuf-Struct) | | |

3
go.mod
View file

@ -9,7 +9,7 @@ require (
github.com/adrium/goheif v0.0.0-20230113233934-ca402e77a786
github.com/anyproto/any-store v0.1.6
github.com/anyproto/any-sync v0.6.0-alpha.11.0.20250131145459-0bf102738a87
github.com/anyproto/anytype-publish-server/publishclient v0.0.0-20241223184559-a5cacfe0950a
github.com/anyproto/anytype-publish-server/publishclient v0.0.0-20250131145601-de288583ff2a
github.com/anyproto/go-chash v0.1.0
github.com/anyproto/go-naturaldate/v2 v2.0.2-0.20230524105841-9829cfd13438
github.com/anyproto/go-slip10 v1.0.0
@ -19,7 +19,6 @@ require (
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/avast/retry-go/v4 v4.6.0
github.com/chai2010/webp v1.1.2-0.20240612091223-aa1b379218b7
github.com/cheggaaa/mb v1.0.3
github.com/cheggaaa/mb/v3 v3.0.2
github.com/dave/jennifer v1.7.1
github.com/davecgh/go-spew v1.1.1

6
go.sum
View file

@ -86,8 +86,8 @@ github.com/anyproto/any-store v0.1.6 h1:CmxIH2JhgHH2s7dnv1SNKAQ3spvX2amG/5RbrTZk
github.com/anyproto/any-store v0.1.6/go.mod h1:nbyRoJYOlvSWU1xDOrmgPP96UeoTf4eYZ9k+qqLK9k8=
github.com/anyproto/any-sync v0.6.0-alpha.11.0.20250131145459-0bf102738a87 h1:Vkkz4b2e/ARu2NO2UhP6OxBUtrfBslVvtF9uCcYraRg=
github.com/anyproto/any-sync v0.6.0-alpha.11.0.20250131145459-0bf102738a87/go.mod h1:JmjcChDSqgt6+G697YI3Yr57SZswxPJNX1arN//UYig=
github.com/anyproto/anytype-publish-server/publishclient v0.0.0-20241223184559-a5cacfe0950a h1:kSNyLHsZ40JxlRAglr85YXH2ik2LH3AH5Hd3JL42KGo=
github.com/anyproto/anytype-publish-server/publishclient v0.0.0-20241223184559-a5cacfe0950a/go.mod h1:4fkueCZcGniSMXkrwESO8zzERrh/L7WHimRNWecfGM0=
github.com/anyproto/anytype-publish-server/publishclient v0.0.0-20250131145601-de288583ff2a h1:ZZM+0OUCQMWSLSflpkf0ZMVo3V76qEDDIXPpQOClNs0=
github.com/anyproto/anytype-publish-server/publishclient v0.0.0-20250131145601-de288583ff2a/go.mod h1:4fkueCZcGniSMXkrwESO8zzERrh/L7WHimRNWecfGM0=
github.com/anyproto/badger/v4 v4.2.1-0.20240110160636-80743fa3d580 h1:Ba80IlCCxkZ9H1GF+7vFu/TSpPvbpDCxXJ5ogc4euYc=
github.com/anyproto/badger/v4 v4.2.1-0.20240110160636-80743fa3d580/go.mod h1:T/uWAYxrXdaXw64ihI++9RMbKTCpKd/yE9+saARew7k=
github.com/anyproto/go-chash v0.1.0 h1:I9meTPjXFRfXZHRJzjOHC/XF7Q5vzysKkiT/grsogXY=
@ -184,8 +184,6 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chai2010/webp v1.1.2-0.20240612091223-aa1b379218b7 h1:EcFyQu4Hz/YC2lc3xWqn678e2FNfG0cgTr/EOA4ByWs=
github.com/chai2010/webp v1.1.2-0.20240612091223-aa1b379218b7/go.mod h1:0XVwvZWdjjdxpUEIf7b9g9VkHFnInUSYujwqTLEuldU=
github.com/cheggaaa/mb v1.0.3 h1:03ksWum+6kHclB+kjwKMaBtgl5gtNYUwNpxsHQciKe8=
github.com/cheggaaa/mb v1.0.3/go.mod h1:NUl0GBtFLlfg2o6iZwxzcG7Lslc2wV/ADTFbLXtVPE4=
github.com/cheggaaa/mb/v3 v3.0.2 h1:jd1Xx0zzihZlXL6HmnRXVCI1BHuXz/kY+VzX9WbvNDU=
github.com/cheggaaa/mb/v3 v3.0.2/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI=
github.com/chigopher/pathlib v0.19.1 h1:RoLlUJc0CqBGwq239cilyhxPNLXTK+HXoASGyGznx5A=

File diff suppressed because it is too large Load diff

View file

@ -1449,6 +1449,7 @@ message Rpc {
int64 timestamp = 6;
int64 size = 7;
bool joinSpace = 8;
google.protobuf.Struct details = 9;
}
message Create {

View file

@ -1,6 +1,7 @@
package logging
import (
"context"
"errors"
"expvar"
"fmt"
@ -10,7 +11,7 @@ import (
"time"
"github.com/anyproto/any-sync/app/logger"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
"gopkg.in/Graylog2/go-gelf.v2/gelf"
)
@ -29,7 +30,7 @@ var (
)
func registerGelfSink(config *logger.Config) {
gelfSinkWrapper.batch = mb.New(1000)
gelfSinkWrapper.batch = mb.New[gelf.Message](1000)
tlsWriter, err := gelf.NewTLSWriter(graylogHost, nil)
if err != nil {
fmt.Printf("failed to init gelf tls: %s", err)
@ -49,7 +50,7 @@ func registerGelfSink(config *logger.Config) {
type gelfSink struct {
sync.RWMutex
batch *mb.MB
batch *mb.MB[gelf.Message]
gelfWriter gelf.Writer
version string
account string
@ -66,17 +67,16 @@ func (gs *gelfSink) Run() {
continue
}
msgs := gs.batch.WaitMax(1)
msgs, err := gs.batch.NewCond().WithMax(1).Wait(context.Background())
if err != nil {
return
}
if len(msgs) == 0 {
return
}
for _, msg := range msgs {
msgCasted, ok := msg.(gelf.Message)
if !ok {
continue
}
err := gs.gelfWriter.WriteMessage(&msgCasted)
err := gs.gelfWriter.WriteMessage(&msg)
if err != nil {
if gs.lastErrorAt.IsZero() || gs.lastErrorAt.Add(printErrorThreshold).Before(time.Now()) {
fmt.Fprintf(os.Stderr, "failed to write to gelf: %v\n", err)