diff --git a/core/migration/relationsfixer.go b/core/migration/relationsfixer.go index 40aa03fb5..2edc62ff8 100644 --- a/core/migration/relationsfixer.go +++ b/core/migration/relationsfixer.go @@ -20,10 +20,10 @@ func (readonlyRelationsFixer) Name() string { return "ReadonlyRelationsFixer" } -func (readonlyRelationsFixer) Run(ctx context.Context, store queryableStore, space doableViaContext) (toMigrate, migrated int, err error) { +func (readonlyRelationsFixer) Run(ctx context.Context, store storeWithCtx, space spaceWithCtx) (toMigrate, migrated int, err error) { spaceId := space.Id() - relations, err := listReadonlyTagAndStatusRelations(store, spaceId) + relations, err := listReadonlyTagAndStatusRelations(ctx, store, spaceId) toMigrate = len(relations) if err != nil { @@ -62,8 +62,8 @@ func (readonlyRelationsFixer) Run(ctx context.Context, store queryableStore, spa return } -func listReadonlyTagAndStatusRelations(store queryableStore, spaceId string) ([]database.Record, error) { - return store.Query(database.Query{Filters: []*model.BlockContentDataviewFilter{ +func listReadonlyTagAndStatusRelations(ctx context.Context, store storeWithCtx, spaceId string) ([]database.Record, error) { + return store.QueryWithContext(ctx, database.Query{Filters: []*model.BlockContentDataviewFilter{ { RelationKey: bundle.RelationKeyRelationFormat.String(), Condition: model.BlockContentDataviewFilter_In, diff --git a/core/migration/runner.go b/core/migration/runner.go index 6fce0e6f3..711b3f08a 100644 --- a/core/migration/runner.go +++ b/core/migration/runner.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/anyproto/anytype-heart/core/block/editor/smartblock" + "github.com/anyproto/anytype-heart/pkg/lib/database" "github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore" "github.com/anyproto/anytype-heart/pkg/lib/logging" "github.com/anyproto/anytype-heart/space/clientspace" @@ -20,13 +21,17 @@ const ( var log = logging.Logger(loggerName) -type doableViaContext interface { +type spaceWithCtx interface { DoCtx(ctx context.Context, objectId string, apply func(sb smartblock.SmartBlock) error) error Id() string } +type storeWithCtx interface { + QueryWithContext(ctx context.Context, q database.Query) (records []database.Record, err error) +} + type Migration interface { - Run(context.Context, queryableStore, doableViaContext) (toMigrate, migrated int, err error) + Run(context.Context, storeWithCtx, spaceWithCtx) (toMigrate, migrated int, err error) Name() string } @@ -40,26 +45,10 @@ func Run(ctx context.Context, store objectstore.ObjectStore, space clientspace.S } func run(ctx context.Context, store objectstore.ObjectStore, space clientspace.Space, migrations ...Migration) (mErr error) { - var ( - spaceId = space.Id() - finish = make(chan struct{}) - lockableStore = &storeWithLock{store: store} - ) - - go func() { - for { - select { - case <-ctx.Done(): - lockableStore.Lock() - return - case <-finish: - return - } - } - }() + spaceId := space.Id() for _, m := range migrations { - toMigrate, migrated, err := m.Run(ctx, lockableStore, space) + toMigrate, migrated, err := m.Run(ctx, store, space) if err != nil { fErr := fmt.Errorf(errFormat, m.Name(), spaceId, err, migrated, toMigrate) mErr = multierror.Append(mErr, fErr) @@ -72,6 +61,5 @@ func run(ctx context.Context, store objectstore.ObjectStore, space clientspace.S log.Debugf("migration '%s' in space '%s' is successful. %d out of %d objects were migrated", m.Name(), spaceId, migrated, toMigrate) } - finish <- struct{}{} return } diff --git a/core/migration/runner_test.go b/core/migration/runner_test.go index d8ce99bb0..9aacd361b 100644 --- a/core/migration/runner_test.go +++ b/core/migration/runner_test.go @@ -127,9 +127,9 @@ func (longStoreMigration) Name() string { return "long migration" } -func (longStoreMigration) Run(_ context.Context, store queryableStore, _ doableViaContext) (toMigrate, migrated int, err error) { +func (longStoreMigration) Run(ctx context.Context, store storeWithCtx, _ spaceWithCtx) (toMigrate, migrated int, err error) { for { - if _, err = store.Query(database.Query{}); err != nil { + if _, err = store.QueryWithContext(ctx, database.Query{}); err != nil { return 0, 0, err } } @@ -141,7 +141,7 @@ func (longSpaceMigration) Name() string { return "long migration" } -func (longSpaceMigration) Run(ctx context.Context, _ queryableStore, space doableViaContext) (toMigrate, migrated int, err error) { +func (longSpaceMigration) Run(ctx context.Context, _ storeWithCtx, space spaceWithCtx) (toMigrate, migrated int, err error) { for { if err = space.DoCtx(ctx, "", func(smartblock.SmartBlock) error { // do smth @@ -158,6 +158,6 @@ func (instantMigration) Name() string { return "instant migration" } -func (instantMigration) Run(_ context.Context, _ queryableStore, _ doableViaContext) (toMigrate, migrated int, err error) { +func (instantMigration) Run(_ context.Context, _ storeWithCtx, _ spaceWithCtx) (toMigrate, migrated int, err error) { return 0, 0, nil } diff --git a/core/migration/safestore.go b/core/migration/safestore.go deleted file mode 100644 index 339f19a1f..000000000 --- a/core/migration/safestore.go +++ /dev/null @@ -1,36 +0,0 @@ -package migration - -import ( - "context" - "sync" - - "github.com/anyproto/anytype-heart/pkg/lib/database" - "github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore" -) - -type queryableStore interface { - Query(q database.Query) (records []database.Record, err error) - Lock() -} - -type storeWithLock struct { - store objectstore.ObjectStore - locked bool - m sync.RWMutex -} - -func (s *storeWithLock) Query(q database.Query) (records []database.Record, err error) { - s.m.RLock() - if s.locked { - s.m.RUnlock() - return nil, context.Canceled - } - s.m.RUnlock() - return s.store.Query(q) -} - -func (s *storeWithLock) Lock() { - s.m.Lock() - s.locked = true - s.m.Unlock() -} diff --git a/core/migration/systemobjectreviser.go b/core/migration/systemobjectreviser.go index d34bb26ce..bc32bd0db 100644 --- a/core/migration/systemobjectreviser.go +++ b/core/migration/systemobjectreviser.go @@ -27,13 +27,13 @@ func (systemObjectReviser) Name() string { return "SystemObjectReviser" } -func (systemObjectReviser) Run(ctx context.Context, store queryableStore, space doableViaContext) (toMigrate, migrated int, err error) { - spaceObjects, err := listAllTypesAndRelations(store, space.Id()) +func (systemObjectReviser) Run(ctx context.Context, store storeWithCtx, space spaceWithCtx) (toMigrate, migrated int, err error) { + spaceObjects, err := listAllTypesAndRelations(ctx, store, space.Id()) if err != nil { return 0, 0, fmt.Errorf("failed to get relations and types from client space: %w", err) } - marketObjects, err := listAllTypesAndRelations(store, addr.AnytypeMarketplaceWorkspace) + marketObjects, err := listAllTypesAndRelations(ctx, store, addr.AnytypeMarketplaceWorkspace) if err != nil { return 0, 0, fmt.Errorf("failed to get relations from marketplace space: %w", err) } @@ -53,8 +53,8 @@ func (systemObjectReviser) Run(ctx context.Context, store queryableStore, space return } -func listAllTypesAndRelations(store queryableStore, spaceId string) (map[string]*types.Struct, error) { - records, err := store.Query(database.Query{ +func listAllTypesAndRelations(ctx context.Context, store storeWithCtx, spaceId string) (map[string]*types.Struct, error) { + records, err := store.QueryWithContext(ctx, database.Query{ Filters: []*model.BlockContentDataviewFilter{ { RelationKey: bundle.RelationKeyLayout.String(), @@ -80,7 +80,7 @@ func listAllTypesAndRelations(store queryableStore, spaceId string) (map[string] return details, nil } -func reviseSystemObject(ctx context.Context, space doableViaContext, localObject *types.Struct, marketObjects map[string]*types.Struct) (toRevise bool, err error) { +func reviseSystemObject(ctx context.Context, space spaceWithCtx, localObject *types.Struct, marketObjects map[string]*types.Struct) (toRevise bool, err error) { source := pbtypes.GetString(localObject, bundle.RelationKeySourceObject.String()) marketObject, found := marketObjects[source] if !found || !isSystemObject(localObject) || pbtypes.GetInt64(marketObject, revisionKey) <= pbtypes.GetInt64(localObject, revisionKey) { diff --git a/pkg/lib/localstore/objectstore/objects.go b/pkg/lib/localstore/objectstore/objects.go index 6dd8d2f51..cc2f33a5d 100644 --- a/pkg/lib/localstore/objectstore/objects.go +++ b/pkg/lib/localstore/objectstore/objects.go @@ -113,6 +113,7 @@ type ObjectStore interface { SubscribeForAll(callback func(rec database.Record)) Query(q database.Query) (records []database.Record, err error) + QueryWithContext(ctx context.Context, q database.Query) (records []database.Record, err error) QueryRaw(f *database.Filters, limit int, offset int) (records []database.Record, err error) QueryByID(ids []string) (records []database.Record, err error) QueryByIDAndSubscribeForChanges(ids []string, subscription database.Subscription) (records []database.Record, close func(), err error) diff --git a/pkg/lib/localstore/objectstore/queries.go b/pkg/lib/localstore/objectstore/queries.go index 4916ef684..10093e591 100644 --- a/pkg/lib/localstore/objectstore/queries.go +++ b/pkg/lib/localstore/objectstore/queries.go @@ -1,6 +1,7 @@ package objectstore import ( + "context" "fmt" "sort" @@ -23,6 +24,32 @@ func (s *dsObjectStore) Query(q database.Query) ([]database.Record, error) { return recs, err } +func (s *dsObjectStore) QueryWithContext(ctx context.Context, q database.Query) (records []database.Record, err error) { + if err = ctx.Err(); err != nil { + return + } + + recordsCh := make(chan []database.Record, 1) + errCh := make(chan error, 1) + go func() { + recs, err := s.Query(q) + if err != nil { + errCh <- err + } else { + recordsCh <- recs + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case records = <-recordsCh: + return records, nil + case err = <-errCh: + return nil, err + } +} + func (s *dsObjectStore) QueryRaw(filters *database.Filters, limit int, offset int) ([]database.Record, error) { if filters == nil || filters.FilterObj == nil { return nil, fmt.Errorf("filter cannot be nil or unitialized") diff --git a/space/clientspace/space.go b/space/clientspace/space.go index 0bbca7669..7e43e3762 100644 --- a/space/clientspace/space.go +++ b/space/clientspace/space.go @@ -69,10 +69,6 @@ type bundledObjectsInstaller interface { BundledObjectsIdsToInstall(ctx context.Context, spc Space, sourceObjectIds []string) (objectIds []string, err error) } -type migrationRunner interface { - RunMigrations(space Space) -} - var log = logger.NewNamed("client.space") var BundledObjectsPeerFindTimeout = time.Second * 30 @@ -83,7 +79,6 @@ type space struct { indexer spaceIndexer derivedIDs threads.DerivedSmartblockIds installer bundledObjectsInstaller - migrationRunner migrationRunner spaceCore spacecore.SpaceCoreService personalSpaceId string @@ -97,7 +92,6 @@ type space struct { type SpaceDeps struct { Indexer spaceIndexer Installer bundledObjectsInstaller - MigrationRunner migrationRunner CommonSpace commonspace.Space ObjectFactory objectcache.ObjectFactory AccountService accountservice.Service @@ -112,7 +106,6 @@ func BuildSpace(ctx context.Context, deps SpaceDeps) (Space, error) { sp := &space{ indexer: deps.Indexer, installer: deps.Installer, - migrationRunner: deps.MigrationRunner, common: deps.CommonSpace, personalSpaceId: deps.PersonalSpaceId, spaceCore: deps.SpaceCore, @@ -138,7 +131,6 @@ func BuildSpace(ctx context.Context, deps SpaceDeps) (Space, error) { if err = sp.InstallBundledObjects(ctx); err != nil { return nil, fmt.Errorf("install bundled objects: %w", err) } - sp.migrationRunner.RunMigrations(sp) } go sp.mandatoryObjectsLoad(deps.LoadCtx, deps.DisableRemoteLoad) return sp, nil @@ -166,7 +158,6 @@ func (s *space) mandatoryObjectsLoad(ctx context.Context, disableRemoteLoad bool if s.loadMandatoryObjectsErr != nil { return } - s.migrationRunner.RunMigrations(s) err := s.migrationProfileObject(ctx) if err != nil { log.Error("failed to migrate profile object", zap.Error(err)) diff --git a/space/internal/components/builder/builder.go b/space/internal/components/builder/builder.go index 07f175344..964b03cb8 100644 --- a/space/internal/components/builder/builder.go +++ b/space/internal/components/builder/builder.go @@ -31,7 +31,6 @@ func New() SpaceBuilder { type spaceBuilder struct { indexer dependencies2.SpaceIndexer installer dependencies2.BundledObjectsInstaller - migrationRunner dependencies2.SpaceMigrationRunner spaceCore spacecore.SpaceCoreService techSpace techspace.TechSpace accountService accountservice.Service @@ -49,7 +48,6 @@ func (b *spaceBuilder) Init(a *app.App) (err error) { b.status = app.MustComponent[spacestatus.SpaceStatus](a) b.indexer = app.MustComponent[dependencies2.SpaceIndexer](a) b.installer = app.MustComponent[dependencies2.BundledObjectsInstaller](a) - b.migrationRunner = app.MustComponent[dependencies2.SpaceMigrationRunner](a) b.spaceCore = app.MustComponent[spacecore.SpaceCoreService](a) b.techSpace = app.MustComponent[techspace.TechSpace](a) b.accountService = app.MustComponent[accountservice.Service](a) @@ -93,7 +91,6 @@ func (b *spaceBuilder) BuildSpace(ctx context.Context, disableRemoteLoad bool) ( deps := clientspace.SpaceDeps{ Indexer: b.indexer, Installer: b.installer, - MigrationRunner: b.migrationRunner, CommonSpace: coreSpace, ObjectFactory: b.objectFactory, AccountService: b.accountService, diff --git a/space/internal/components/dependencies/migration.go b/space/internal/components/dependencies/migration.go deleted file mode 100644 index ef9fbdeac..000000000 --- a/space/internal/components/dependencies/migration.go +++ /dev/null @@ -1,9 +0,0 @@ -package dependencies - -import ( - "github.com/anyproto/anytype-heart/space/clientspace" -) - -type SpaceMigrationRunner interface { - RunMigrations(space clientspace.Space) -}