mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-09 17:44:59 +09:00
GO-2331 Introduce store.QueryWithCtx
This commit is contained in:
parent
fc0f81cc4c
commit
5804a170cd
10 changed files with 51 additions and 92 deletions
|
@ -20,10 +20,10 @@ func (readonlyRelationsFixer) Name() string {
|
|||
return "ReadonlyRelationsFixer"
|
||||
}
|
||||
|
||||
func (readonlyRelationsFixer) Run(ctx context.Context, store queryableStore, space doableViaContext) (toMigrate, migrated int, err error) {
|
||||
func (readonlyRelationsFixer) Run(ctx context.Context, store storeWithCtx, space spaceWithCtx) (toMigrate, migrated int, err error) {
|
||||
spaceId := space.Id()
|
||||
|
||||
relations, err := listReadonlyTagAndStatusRelations(store, spaceId)
|
||||
relations, err := listReadonlyTagAndStatusRelations(ctx, store, spaceId)
|
||||
toMigrate = len(relations)
|
||||
|
||||
if err != nil {
|
||||
|
@ -62,8 +62,8 @@ func (readonlyRelationsFixer) Run(ctx context.Context, store queryableStore, spa
|
|||
return
|
||||
}
|
||||
|
||||
func listReadonlyTagAndStatusRelations(store queryableStore, spaceId string) ([]database.Record, error) {
|
||||
return store.Query(database.Query{Filters: []*model.BlockContentDataviewFilter{
|
||||
func listReadonlyTagAndStatusRelations(ctx context.Context, store storeWithCtx, spaceId string) ([]database.Record, error) {
|
||||
return store.QueryWithContext(ctx, database.Query{Filters: []*model.BlockContentDataviewFilter{
|
||||
{
|
||||
RelationKey: bundle.RelationKeyRelationFormat.String(),
|
||||
Condition: model.BlockContentDataviewFilter_In,
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/hashicorp/go-multierror"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/database"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/logging"
|
||||
"github.com/anyproto/anytype-heart/space/clientspace"
|
||||
|
@ -20,13 +21,17 @@ const (
|
|||
|
||||
var log = logging.Logger(loggerName)
|
||||
|
||||
type doableViaContext interface {
|
||||
type spaceWithCtx interface {
|
||||
DoCtx(ctx context.Context, objectId string, apply func(sb smartblock.SmartBlock) error) error
|
||||
Id() string
|
||||
}
|
||||
|
||||
type storeWithCtx interface {
|
||||
QueryWithContext(ctx context.Context, q database.Query) (records []database.Record, err error)
|
||||
}
|
||||
|
||||
type Migration interface {
|
||||
Run(context.Context, queryableStore, doableViaContext) (toMigrate, migrated int, err error)
|
||||
Run(context.Context, storeWithCtx, spaceWithCtx) (toMigrate, migrated int, err error)
|
||||
Name() string
|
||||
}
|
||||
|
||||
|
@ -40,26 +45,10 @@ func Run(ctx context.Context, store objectstore.ObjectStore, space clientspace.S
|
|||
}
|
||||
|
||||
func run(ctx context.Context, store objectstore.ObjectStore, space clientspace.Space, migrations ...Migration) (mErr error) {
|
||||
var (
|
||||
spaceId = space.Id()
|
||||
finish = make(chan struct{})
|
||||
lockableStore = &storeWithLock{store: store}
|
||||
)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
lockableStore.Lock()
|
||||
return
|
||||
case <-finish:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
spaceId := space.Id()
|
||||
|
||||
for _, m := range migrations {
|
||||
toMigrate, migrated, err := m.Run(ctx, lockableStore, space)
|
||||
toMigrate, migrated, err := m.Run(ctx, store, space)
|
||||
if err != nil {
|
||||
fErr := fmt.Errorf(errFormat, m.Name(), spaceId, err, migrated, toMigrate)
|
||||
mErr = multierror.Append(mErr, fErr)
|
||||
|
@ -72,6 +61,5 @@ func run(ctx context.Context, store objectstore.ObjectStore, space clientspace.S
|
|||
log.Debugf("migration '%s' in space '%s' is successful. %d out of %d objects were migrated",
|
||||
m.Name(), spaceId, migrated, toMigrate)
|
||||
}
|
||||
finish <- struct{}{}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -127,9 +127,9 @@ func (longStoreMigration) Name() string {
|
|||
return "long migration"
|
||||
}
|
||||
|
||||
func (longStoreMigration) Run(_ context.Context, store queryableStore, _ doableViaContext) (toMigrate, migrated int, err error) {
|
||||
func (longStoreMigration) Run(ctx context.Context, store storeWithCtx, _ spaceWithCtx) (toMigrate, migrated int, err error) {
|
||||
for {
|
||||
if _, err = store.Query(database.Query{}); err != nil {
|
||||
if _, err = store.QueryWithContext(ctx, database.Query{}); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ func (longSpaceMigration) Name() string {
|
|||
return "long migration"
|
||||
}
|
||||
|
||||
func (longSpaceMigration) Run(ctx context.Context, _ queryableStore, space doableViaContext) (toMigrate, migrated int, err error) {
|
||||
func (longSpaceMigration) Run(ctx context.Context, _ storeWithCtx, space spaceWithCtx) (toMigrate, migrated int, err error) {
|
||||
for {
|
||||
if err = space.DoCtx(ctx, "", func(smartblock.SmartBlock) error {
|
||||
// do smth
|
||||
|
@ -158,6 +158,6 @@ func (instantMigration) Name() string {
|
|||
return "instant migration"
|
||||
}
|
||||
|
||||
func (instantMigration) Run(_ context.Context, _ queryableStore, _ doableViaContext) (toMigrate, migrated int, err error) {
|
||||
func (instantMigration) Run(_ context.Context, _ storeWithCtx, _ spaceWithCtx) (toMigrate, migrated int, err error) {
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/database"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
|
||||
)
|
||||
|
||||
type queryableStore interface {
|
||||
Query(q database.Query) (records []database.Record, err error)
|
||||
Lock()
|
||||
}
|
||||
|
||||
type storeWithLock struct {
|
||||
store objectstore.ObjectStore
|
||||
locked bool
|
||||
m sync.RWMutex
|
||||
}
|
||||
|
||||
func (s *storeWithLock) Query(q database.Query) (records []database.Record, err error) {
|
||||
s.m.RLock()
|
||||
if s.locked {
|
||||
s.m.RUnlock()
|
||||
return nil, context.Canceled
|
||||
}
|
||||
s.m.RUnlock()
|
||||
return s.store.Query(q)
|
||||
}
|
||||
|
||||
func (s *storeWithLock) Lock() {
|
||||
s.m.Lock()
|
||||
s.locked = true
|
||||
s.m.Unlock()
|
||||
}
|
|
@ -27,13 +27,13 @@ func (systemObjectReviser) Name() string {
|
|||
return "SystemObjectReviser"
|
||||
}
|
||||
|
||||
func (systemObjectReviser) Run(ctx context.Context, store queryableStore, space doableViaContext) (toMigrate, migrated int, err error) {
|
||||
spaceObjects, err := listAllTypesAndRelations(store, space.Id())
|
||||
func (systemObjectReviser) Run(ctx context.Context, store storeWithCtx, space spaceWithCtx) (toMigrate, migrated int, err error) {
|
||||
spaceObjects, err := listAllTypesAndRelations(ctx, store, space.Id())
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("failed to get relations and types from client space: %w", err)
|
||||
}
|
||||
|
||||
marketObjects, err := listAllTypesAndRelations(store, addr.AnytypeMarketplaceWorkspace)
|
||||
marketObjects, err := listAllTypesAndRelations(ctx, store, addr.AnytypeMarketplaceWorkspace)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("failed to get relations from marketplace space: %w", err)
|
||||
}
|
||||
|
@ -53,8 +53,8 @@ func (systemObjectReviser) Run(ctx context.Context, store queryableStore, space
|
|||
return
|
||||
}
|
||||
|
||||
func listAllTypesAndRelations(store queryableStore, spaceId string) (map[string]*types.Struct, error) {
|
||||
records, err := store.Query(database.Query{
|
||||
func listAllTypesAndRelations(ctx context.Context, store storeWithCtx, spaceId string) (map[string]*types.Struct, error) {
|
||||
records, err := store.QueryWithContext(ctx, database.Query{
|
||||
Filters: []*model.BlockContentDataviewFilter{
|
||||
{
|
||||
RelationKey: bundle.RelationKeyLayout.String(),
|
||||
|
@ -80,7 +80,7 @@ func listAllTypesAndRelations(store queryableStore, spaceId string) (map[string]
|
|||
return details, nil
|
||||
}
|
||||
|
||||
func reviseSystemObject(ctx context.Context, space doableViaContext, localObject *types.Struct, marketObjects map[string]*types.Struct) (toRevise bool, err error) {
|
||||
func reviseSystemObject(ctx context.Context, space spaceWithCtx, localObject *types.Struct, marketObjects map[string]*types.Struct) (toRevise bool, err error) {
|
||||
source := pbtypes.GetString(localObject, bundle.RelationKeySourceObject.String())
|
||||
marketObject, found := marketObjects[source]
|
||||
if !found || !isSystemObject(localObject) || pbtypes.GetInt64(marketObject, revisionKey) <= pbtypes.GetInt64(localObject, revisionKey) {
|
||||
|
|
|
@ -113,6 +113,7 @@ type ObjectStore interface {
|
|||
SubscribeForAll(callback func(rec database.Record))
|
||||
|
||||
Query(q database.Query) (records []database.Record, err error)
|
||||
QueryWithContext(ctx context.Context, q database.Query) (records []database.Record, err error)
|
||||
QueryRaw(f *database.Filters, limit int, offset int) (records []database.Record, err error)
|
||||
QueryByID(ids []string) (records []database.Record, err error)
|
||||
QueryByIDAndSubscribeForChanges(ids []string, subscription database.Subscription) (records []database.Record, close func(), err error)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package objectstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
|
@ -23,6 +24,32 @@ func (s *dsObjectStore) Query(q database.Query) ([]database.Record, error) {
|
|||
return recs, err
|
||||
}
|
||||
|
||||
func (s *dsObjectStore) QueryWithContext(ctx context.Context, q database.Query) (records []database.Record, err error) {
|
||||
if err = ctx.Err(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
recordsCh := make(chan []database.Record, 1)
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
recs, err := s.Query(q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
} else {
|
||||
recordsCh <- recs
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case records = <-recordsCh:
|
||||
return records, nil
|
||||
case err = <-errCh:
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func (s *dsObjectStore) QueryRaw(filters *database.Filters, limit int, offset int) ([]database.Record, error) {
|
||||
if filters == nil || filters.FilterObj == nil {
|
||||
return nil, fmt.Errorf("filter cannot be nil or unitialized")
|
||||
|
|
|
@ -69,10 +69,6 @@ type bundledObjectsInstaller interface {
|
|||
BundledObjectsIdsToInstall(ctx context.Context, spc Space, sourceObjectIds []string) (objectIds []string, err error)
|
||||
}
|
||||
|
||||
type migrationRunner interface {
|
||||
RunMigrations(space Space)
|
||||
}
|
||||
|
||||
var log = logger.NewNamed("client.space")
|
||||
var BundledObjectsPeerFindTimeout = time.Second * 30
|
||||
|
||||
|
@ -83,7 +79,6 @@ type space struct {
|
|||
indexer spaceIndexer
|
||||
derivedIDs threads.DerivedSmartblockIds
|
||||
installer bundledObjectsInstaller
|
||||
migrationRunner migrationRunner
|
||||
spaceCore spacecore.SpaceCoreService
|
||||
personalSpaceId string
|
||||
|
||||
|
@ -97,7 +92,6 @@ type space struct {
|
|||
type SpaceDeps struct {
|
||||
Indexer spaceIndexer
|
||||
Installer bundledObjectsInstaller
|
||||
MigrationRunner migrationRunner
|
||||
CommonSpace commonspace.Space
|
||||
ObjectFactory objectcache.ObjectFactory
|
||||
AccountService accountservice.Service
|
||||
|
@ -112,7 +106,6 @@ func BuildSpace(ctx context.Context, deps SpaceDeps) (Space, error) {
|
|||
sp := &space{
|
||||
indexer: deps.Indexer,
|
||||
installer: deps.Installer,
|
||||
migrationRunner: deps.MigrationRunner,
|
||||
common: deps.CommonSpace,
|
||||
personalSpaceId: deps.PersonalSpaceId,
|
||||
spaceCore: deps.SpaceCore,
|
||||
|
@ -138,7 +131,6 @@ func BuildSpace(ctx context.Context, deps SpaceDeps) (Space, error) {
|
|||
if err = sp.InstallBundledObjects(ctx); err != nil {
|
||||
return nil, fmt.Errorf("install bundled objects: %w", err)
|
||||
}
|
||||
sp.migrationRunner.RunMigrations(sp)
|
||||
}
|
||||
go sp.mandatoryObjectsLoad(deps.LoadCtx, deps.DisableRemoteLoad)
|
||||
return sp, nil
|
||||
|
@ -166,7 +158,6 @@ func (s *space) mandatoryObjectsLoad(ctx context.Context, disableRemoteLoad bool
|
|||
if s.loadMandatoryObjectsErr != nil {
|
||||
return
|
||||
}
|
||||
s.migrationRunner.RunMigrations(s)
|
||||
err := s.migrationProfileObject(ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to migrate profile object", zap.Error(err))
|
||||
|
|
|
@ -31,7 +31,6 @@ func New() SpaceBuilder {
|
|||
type spaceBuilder struct {
|
||||
indexer dependencies2.SpaceIndexer
|
||||
installer dependencies2.BundledObjectsInstaller
|
||||
migrationRunner dependencies2.SpaceMigrationRunner
|
||||
spaceCore spacecore.SpaceCoreService
|
||||
techSpace techspace.TechSpace
|
||||
accountService accountservice.Service
|
||||
|
@ -49,7 +48,6 @@ func (b *spaceBuilder) Init(a *app.App) (err error) {
|
|||
b.status = app.MustComponent[spacestatus.SpaceStatus](a)
|
||||
b.indexer = app.MustComponent[dependencies2.SpaceIndexer](a)
|
||||
b.installer = app.MustComponent[dependencies2.BundledObjectsInstaller](a)
|
||||
b.migrationRunner = app.MustComponent[dependencies2.SpaceMigrationRunner](a)
|
||||
b.spaceCore = app.MustComponent[spacecore.SpaceCoreService](a)
|
||||
b.techSpace = app.MustComponent[techspace.TechSpace](a)
|
||||
b.accountService = app.MustComponent[accountservice.Service](a)
|
||||
|
@ -93,7 +91,6 @@ func (b *spaceBuilder) BuildSpace(ctx context.Context, disableRemoteLoad bool) (
|
|||
deps := clientspace.SpaceDeps{
|
||||
Indexer: b.indexer,
|
||||
Installer: b.installer,
|
||||
MigrationRunner: b.migrationRunner,
|
||||
CommonSpace: coreSpace,
|
||||
ObjectFactory: b.objectFactory,
|
||||
AccountService: b.accountService,
|
||||
|
|
|
@ -1,9 +0,0 @@
|
|||
package dependencies
|
||||
|
||||
import (
|
||||
"github.com/anyproto/anytype-heart/space/clientspace"
|
||||
)
|
||||
|
||||
type SpaceMigrationRunner interface {
|
||||
RunMigrations(space clientspace.Space)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue