mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-09 17:44:59 +09:00
GO-501: Rearrange methods
This commit is contained in:
parent
7d5f12bfce
commit
f6b71b8300
2 changed files with 353 additions and 345 deletions
133
core/indexer/full_text.go
Normal file
133
core/indexer/full_text.go
Normal file
|
@ -0,0 +1,133 @@
|
|||
package indexer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/anytypeio/go-anytype-middleware/metrics"
|
||||
"github.com/anytypeio/go-anytype-middleware/pkg/lib/bundle"
|
||||
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core/smartblock"
|
||||
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/ftsearch"
|
||||
"github.com/anytypeio/go-anytype-middleware/util/pbtypes"
|
||||
"github.com/anytypeio/go-anytype-middleware/util/slice"
|
||||
)
|
||||
|
||||
func (i *indexer) ForceFTIndex() {
|
||||
select {
|
||||
case i.forceFt <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexer) ftLoop() {
|
||||
ticker := time.NewTicker(ftIndexInterval)
|
||||
i.ftIndex()
|
||||
var lastForceIndex time.Time
|
||||
i.mu.Lock()
|
||||
quit := i.quit
|
||||
i.mu.Unlock()
|
||||
for {
|
||||
select {
|
||||
case <-quit:
|
||||
return
|
||||
case <-ticker.C:
|
||||
i.ftIndex()
|
||||
case <-i.forceFt:
|
||||
if time.Since(lastForceIndex) > ftIndexForceMinInterval {
|
||||
i.ftIndex()
|
||||
lastForceIndex = time.Now()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexer) ftIndex() {
|
||||
if err := i.store.IndexForEach(i.ftIndexDoc); err != nil {
|
||||
log.Errorf("store.IndexForEach error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexer) ftIndexDoc(id string, _ time.Time) (err error) {
|
||||
st := time.Now()
|
||||
// ctx := context.WithValue(context.Background(), ocache.CacheTimeout, cacheTimeout)
|
||||
ctx := context.WithValue(context.Background(), metrics.CtxKeyRequest, "index_fulltext")
|
||||
|
||||
info, err := i.doc.GetDocInfo(ctx, id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
sbType, err := i.typeProvider.Type(info.Id)
|
||||
if err != nil {
|
||||
sbType = smartblock.SmartBlockTypePage
|
||||
}
|
||||
indexDetails, _ := sbType.Indexable()
|
||||
if !indexDetails {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err = i.store.UpdateObjectSnippet(id, info.State.Snippet()); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(info.FileHashes) > 0 {
|
||||
// todo: move file indexing to the main indexer as we have the full state there now
|
||||
existingIDs, err := i.store.HasIDs(info.FileHashes...)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get existing file ids : %s", err.Error())
|
||||
}
|
||||
newIds := slice.Difference(info.FileHashes, existingIDs)
|
||||
for _, hash := range newIds {
|
||||
// file's hash is id
|
||||
err = i.reindexDoc(ctx, hash, false)
|
||||
if err != nil {
|
||||
log.With("id", hash).Errorf("failed to reindex file: %s", err.Error())
|
||||
}
|
||||
|
||||
err = i.store.AddToIndexQueue(hash)
|
||||
if err != nil {
|
||||
log.With("id", hash).Error(err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if fts := i.store.FTSearch(); fts != nil {
|
||||
title := pbtypes.GetString(info.State.Details(), bundle.RelationKeyName.String())
|
||||
if info.State.ObjectType() == bundle.TypeKeyNote.String() || title == "" {
|
||||
title = info.State.Snippet()
|
||||
}
|
||||
ftDoc := ftsearch.SearchDoc{
|
||||
Id: id,
|
||||
Title: title,
|
||||
Text: info.State.SearchText(),
|
||||
}
|
||||
if err := fts.Index(ftDoc); err != nil {
|
||||
log.Errorf("can't ft index doc: %v", err)
|
||||
}
|
||||
log.Debugf("ft search indexed with title: '%s'", ftDoc.Title)
|
||||
}
|
||||
|
||||
log.With("thread", id).Infof("ft index updated for a %v", time.Since(st))
|
||||
return
|
||||
}
|
||||
|
||||
func (i *indexer) ftInit() error {
|
||||
if ft := i.store.FTSearch(); ft != nil {
|
||||
docCount, err := ft.DocCount()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if docCount == 0 {
|
||||
ids, err := i.store.ListIds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, id := range ids {
|
||||
if err := i.store.AddToIndexQueue(id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -27,7 +27,6 @@ import (
|
|||
"github.com/anytypeio/go-anytype-middleware/pkg/lib/database"
|
||||
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/addr"
|
||||
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/filestore"
|
||||
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/ftsearch"
|
||||
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/objectstore"
|
||||
"github.com/anytypeio/go-anytype-middleware/pkg/lib/logging"
|
||||
"github.com/anytypeio/go-anytype-middleware/pkg/lib/pb/model"
|
||||
|
@ -169,39 +168,6 @@ func (i *indexer) Name() (name string) {
|
|||
return CName
|
||||
}
|
||||
|
||||
func (i *indexer) saveLatestChecksums() error {
|
||||
// todo: add layout indexing when needed
|
||||
checksums := model.ObjectStoreChecksums{
|
||||
BundledObjectTypes: bundle.TypeChecksum,
|
||||
BundledRelations: bundle.RelationChecksum,
|
||||
BundledTemplates: i.btHash.Hash(),
|
||||
ObjectsForceReindexCounter: ForceThreadsObjectsReindexCounter,
|
||||
FilesForceReindexCounter: ForceFilesReindexCounter,
|
||||
|
||||
IdxRebuildCounter: ForceIdxRebuildCounter,
|
||||
FulltextRebuild: ForceFulltextIndexCounter,
|
||||
BundledObjects: ForceBundledObjectsReindexCounter,
|
||||
FilestoreKeysForceReindexCounter: ForceFilestoreKeysReindexCounter,
|
||||
}
|
||||
return i.store.SaveChecksums(&checksums)
|
||||
}
|
||||
|
||||
func (i *indexer) saveLatestCounters() error {
|
||||
// todo: add layout indexing when needed
|
||||
checksums := model.ObjectStoreChecksums{
|
||||
BundledObjectTypes: bundle.TypeChecksum,
|
||||
BundledRelations: bundle.RelationChecksum,
|
||||
BundledTemplates: i.btHash.Hash(),
|
||||
ObjectsForceReindexCounter: ForceThreadsObjectsReindexCounter,
|
||||
FilesForceReindexCounter: ForceFilesReindexCounter,
|
||||
IdxRebuildCounter: ForceIdxRebuildCounter,
|
||||
FulltextRebuild: ForceFulltextIndexCounter,
|
||||
BundledObjects: ForceBundledObjectsReindexCounter,
|
||||
FilestoreKeysForceReindexCounter: ForceFilestoreKeysReindexCounter,
|
||||
}
|
||||
return i.store.SaveChecksums(&checksums)
|
||||
}
|
||||
|
||||
func (i *indexer) Run(context.Context) (err error) {
|
||||
if ftErr := i.ftInit(); ftErr != nil {
|
||||
log.Errorf("can't init ft: %v", ftErr)
|
||||
|
@ -215,13 +181,6 @@ func (i *indexer) Run(context.Context) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (i *indexer) ForceFTIndex() {
|
||||
select {
|
||||
case i.forceFt <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexer) migrateRemoveNonindexableObjects() {
|
||||
ids, err := i.getIdsForTypes(
|
||||
smartblock.SmartblockTypeMarketplaceType, smartblock.SmartblockTypeMarketplaceRelation,
|
||||
|
@ -239,12 +198,101 @@ func (i *indexer) migrateRemoveNonindexableObjects() {
|
|||
}
|
||||
}
|
||||
|
||||
func (i *indexer) Close(ctx context.Context) (err error) {
|
||||
i.mu.Lock()
|
||||
quit := i.quit
|
||||
i.mu.Unlock()
|
||||
if quit != nil {
|
||||
close(quit)
|
||||
i.mu.Lock()
|
||||
i.quit = nil
|
||||
i.mu.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *indexer) Index(ctx context.Context, info doc.DocInfo) error {
|
||||
startTime := time.Now()
|
||||
sbType, err := i.typeProvider.Type(info.Id)
|
||||
if err != nil {
|
||||
sbType = smartblock.SmartBlockTypePage
|
||||
}
|
||||
saveIndexedHash := func() {
|
||||
if headsHash := headsHash(info.Heads); headsHash != "" {
|
||||
err = i.store.SaveLastIndexedHeadsHash(info.Id, headsHash)
|
||||
if err != nil {
|
||||
log.With("thread", info.Id).Errorf("failed to save indexed heads hash: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
indexDetails, indexLinks := sbType.Indexable()
|
||||
if sbType != smartblock.SmartBlockTypeSubObject && sbType != smartblock.SmartBlockTypeWorkspace && sbType != smartblock.SmartBlockTypeBreadcrumbs {
|
||||
// avoid recursions
|
||||
log.With("migratedtype", sbType).Warn("migrating types")
|
||||
if pbtypes.GetString(info.State.CombinedDetails(), bundle.RelationKeyCreator.String()) != addr.AnytypeProfileId {
|
||||
i.migrateRelations(extractOldRelationsFromState(info.State))
|
||||
i.migrateObjectTypes(info.State.ObjectTypesToMigrate())
|
||||
}
|
||||
}
|
||||
if !indexDetails && !indexLinks {
|
||||
saveIndexedHash()
|
||||
return nil
|
||||
}
|
||||
|
||||
details := info.State.CombinedDetails()
|
||||
details.Fields[bundle.RelationKeyLinks.String()] = pbtypes.StringList(info.Links)
|
||||
setCreator := pbtypes.GetString(info.State.LocalDetails(), bundle.RelationKeyCreator.String())
|
||||
if setCreator == "" {
|
||||
setCreator = i.anytype.ProfileID()
|
||||
}
|
||||
indexSetTime := time.Now()
|
||||
var hasError bool
|
||||
if indexLinks {
|
||||
if err = i.store.UpdateObjectLinks(info.Id, info.Links); err != nil {
|
||||
hasError = true
|
||||
log.With("thread", info.Id).Errorf("failed to save object links: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
indexLinksTime := time.Now()
|
||||
if indexDetails {
|
||||
if err := i.store.UpdateObjectDetails(info.Id, details, false); err != nil {
|
||||
hasError = true
|
||||
log.With("thread", info.Id).Errorf("can't update object store: %v", err)
|
||||
}
|
||||
if err := i.store.AddToIndexQueue(info.Id); err != nil {
|
||||
log.With("thread", info.Id).Errorf("can't add id to index queue: %v", err)
|
||||
} else {
|
||||
log.With("thread", info.Id).Debugf("to index queue")
|
||||
}
|
||||
} else {
|
||||
_ = i.store.DeleteDetails(info.Id)
|
||||
}
|
||||
indexDetailsTime := time.Now()
|
||||
detailsCount := 0
|
||||
if details.GetFields() != nil {
|
||||
detailsCount = len(details.GetFields())
|
||||
}
|
||||
|
||||
if !hasError {
|
||||
saveIndexedHash()
|
||||
}
|
||||
|
||||
metrics.SharedClient.RecordEvent(metrics.IndexEvent{
|
||||
ObjectId: info.Id,
|
||||
IndexLinksTimeMs: indexLinksTime.Sub(indexSetTime).Milliseconds(),
|
||||
IndexDetailsTimeMs: indexDetailsTime.Sub(indexLinksTime).Milliseconds(),
|
||||
IndexSetRelationsTimeMs: indexSetTime.Sub(startTime).Milliseconds(),
|
||||
RelationsCount: len(info.State.PickRelationLinks()),
|
||||
DetailsCount: detailsCount,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *indexer) reindexIfNeeded() error {
|
||||
var (
|
||||
err error
|
||||
checksums *model.ObjectStoreChecksums
|
||||
)
|
||||
checksums, err = i.store.GetChecksums()
|
||||
checksums, err := i.store.GetChecksums()
|
||||
if err != nil && err != ds.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
|
@ -286,85 +334,10 @@ func (i *indexer) reindexIfNeeded() error {
|
|||
if checksums.IdxRebuildCounter != ForceIdxRebuildCounter {
|
||||
flags.enableAll()
|
||||
}
|
||||
return i.Reindex(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "reindex_forced"), flags)
|
||||
return i.reindex(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "reindex_forced"), flags)
|
||||
}
|
||||
|
||||
func (i *indexer) reindexOutdatedThreads() (toReindex, success int, err error) {
|
||||
spc, err := i.spaceService.AccountSpace(context.Background())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
tids := spc.StoredIds()
|
||||
var idsToReindex []string
|
||||
for _, tid := range tids {
|
||||
logErr := func(err error) {
|
||||
log.With("tree", tid).Errorf("reindexOutdatedThreads failed to get tree to reindex: %s", err.Error())
|
||||
}
|
||||
|
||||
lastHash, err := i.store.GetLastIndexedHeadsHash(tid)
|
||||
if err != nil {
|
||||
logErr(err)
|
||||
continue
|
||||
}
|
||||
info, err := spc.Storage().TreeStorage(tid)
|
||||
if err != nil {
|
||||
logErr(err)
|
||||
continue
|
||||
}
|
||||
heads, err := info.Heads()
|
||||
if err != nil {
|
||||
logErr(err)
|
||||
continue
|
||||
}
|
||||
|
||||
hh := headsHash(heads)
|
||||
if lastHash != hh {
|
||||
log.With("tree", tid).Warnf("not equal indexed heads hash: %s!=%s (%d logs)", lastHash, hh, len(heads))
|
||||
idsToReindex = append(idsToReindex, tid)
|
||||
}
|
||||
}
|
||||
if len(idsToReindex) > 0 {
|
||||
for _, id := range idsToReindex {
|
||||
// TODO: we should reindex it I guess at start
|
||||
// if i.anytype.PredefinedBlocks().IsAccount(id) {
|
||||
// continue
|
||||
// }
|
||||
|
||||
ctx := context.WithValue(context.Background(), metrics.CtxKeyRequest, "reindexOutdatedThreads")
|
||||
d, err := i.doc.GetDocInfo(ctx, id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = i.Index(ctx, d)
|
||||
if err == nil {
|
||||
success++
|
||||
} else {
|
||||
log.With("thread", id).Errorf("reindexOutdatedThreads failed to index doc: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
return len(idsToReindex), success, nil
|
||||
}
|
||||
|
||||
func (i *indexer) getIdsForTypes(sbt ...smartblock.SmartBlockType) ([]string, error) {
|
||||
var ids []string
|
||||
for _, t := range sbt {
|
||||
st, err := i.source.SourceTypeBySbType(t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
idsT, err := st.ListIds()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ids = append(ids, idsT...)
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (i *indexer) Reindex(ctx context.Context, flags reindexFlags) (err error) {
|
||||
func (i *indexer) reindex(ctx context.Context, flags reindexFlags) (err error) {
|
||||
if flags.any() {
|
||||
log.Infof("start store reindex (%s)", flags.String())
|
||||
}
|
||||
|
@ -659,66 +632,96 @@ func (i *indexer) Reindex(ctx context.Context, flags reindexFlags) (err error) {
|
|||
return i.saveLatestChecksums()
|
||||
}
|
||||
|
||||
func extractOldRelationsFromState(s *state.State) []*model.Relation {
|
||||
var rels []*model.Relation
|
||||
if objRels := s.OldExtraRelations(); len(objRels) > 0 {
|
||||
rels = append(rels, s.OldExtraRelations()...)
|
||||
}
|
||||
func (i *indexer) saveLatestChecksums() error {
|
||||
// todo: add layout indexing when needed
|
||||
checksums := model.ObjectStoreChecksums{
|
||||
BundledObjectTypes: bundle.TypeChecksum,
|
||||
BundledRelations: bundle.RelationChecksum,
|
||||
BundledTemplates: i.btHash.Hash(),
|
||||
ObjectsForceReindexCounter: ForceThreadsObjectsReindexCounter,
|
||||
FilesForceReindexCounter: ForceFilesReindexCounter,
|
||||
|
||||
if dvBlock := s.Pick(template.DataviewBlockId); dvBlock != nil {
|
||||
rels = append(rels, dvBlock.Model().GetDataview().GetRelations()...)
|
||||
IdxRebuildCounter: ForceIdxRebuildCounter,
|
||||
FulltextRebuild: ForceFulltextIndexCounter,
|
||||
BundledObjects: ForceBundledObjectsReindexCounter,
|
||||
FilestoreKeysForceReindexCounter: ForceFilestoreKeysReindexCounter,
|
||||
}
|
||||
|
||||
return rels
|
||||
return i.store.SaveChecksums(&checksums)
|
||||
}
|
||||
|
||||
func (i *indexer) migrateRelations(rels []*model.Relation) {
|
||||
if len(rels) == 0 {
|
||||
func (i *indexer) saveLatestCounters() error {
|
||||
// todo: add layout indexing when needed
|
||||
checksums := model.ObjectStoreChecksums{
|
||||
BundledObjectTypes: bundle.TypeChecksum,
|
||||
BundledRelations: bundle.RelationChecksum,
|
||||
BundledTemplates: i.btHash.Hash(),
|
||||
ObjectsForceReindexCounter: ForceThreadsObjectsReindexCounter,
|
||||
FilesForceReindexCounter: ForceFilesReindexCounter,
|
||||
IdxRebuildCounter: ForceIdxRebuildCounter,
|
||||
FulltextRebuild: ForceFulltextIndexCounter,
|
||||
BundledObjects: ForceBundledObjectsReindexCounter,
|
||||
FilestoreKeysForceReindexCounter: ForceFilestoreKeysReindexCounter,
|
||||
}
|
||||
return i.store.SaveChecksums(&checksums)
|
||||
}
|
||||
|
||||
func (i *indexer) reindexOutdatedThreads() (toReindex, success int, err error) {
|
||||
spc, err := i.spaceService.AccountSpace(context.Background())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
i.relationMigratorMu.Lock()
|
||||
defer i.relationMigratorMu.Unlock()
|
||||
|
||||
if i.relationBulkMigration != nil {
|
||||
i.relationBulkMigration.AddRelations(rels)
|
||||
} else {
|
||||
err := i.relationService.MigrateRelations(rels)
|
||||
if err != nil {
|
||||
log.Errorf("migrateRelations got error: %s", err.Error())
|
||||
tids := spc.StoredIds()
|
||||
var idsToReindex []string
|
||||
for _, tid := range tids {
|
||||
logErr := func(err error) {
|
||||
log.With("tree", tid).Errorf("reindexOutdatedThreads failed to get tree to reindex: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexer) migrateObjectTypes(ots []string) {
|
||||
if len(ots) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var typesModels []*model.ObjectType // do not make
|
||||
for _, ot := range ots {
|
||||
t, err := bundle.GetTypeByUrl(ot)
|
||||
lastHash, err := i.store.GetLastIndexedHeadsHash(tid)
|
||||
if err != nil {
|
||||
logErr(err)
|
||||
continue
|
||||
}
|
||||
info, err := spc.Storage().TreeStorage(tid)
|
||||
if err != nil {
|
||||
logErr(err)
|
||||
continue
|
||||
}
|
||||
heads, err := info.Heads()
|
||||
if err != nil {
|
||||
logErr(err)
|
||||
continue
|
||||
}
|
||||
|
||||
typesModels = append(typesModels, t)
|
||||
}
|
||||
|
||||
if len(typesModels) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
i.relationMigratorMu.Lock()
|
||||
defer i.relationMigratorMu.Unlock()
|
||||
|
||||
if i.relationBulkMigration != nil {
|
||||
i.relationBulkMigration.AddObjectTypes(typesModels)
|
||||
} else {
|
||||
err := i.relationService.MigrateObjectTypes(typesModels)
|
||||
if err != nil {
|
||||
log.Errorf("migrateObjectTypes got error: %s", err.Error())
|
||||
hh := headsHash(heads)
|
||||
if lastHash != hh {
|
||||
log.With("tree", tid).Warnf("not equal indexed heads hash: %s!=%s (%d logs)", lastHash, hh, len(heads))
|
||||
idsToReindex = append(idsToReindex, tid)
|
||||
}
|
||||
}
|
||||
if len(idsToReindex) > 0 {
|
||||
for _, id := range idsToReindex {
|
||||
// TODO: we should reindex it I guess at start
|
||||
// if i.anytype.PredefinedBlocks().IsAccount(id) {
|
||||
// continue
|
||||
// }
|
||||
|
||||
ctx := context.WithValue(context.Background(), metrics.CtxKeyRequest, "reindexOutdatedThreads")
|
||||
d, err := i.doc.GetDocInfo(ctx, id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = i.Index(ctx, d)
|
||||
if err == nil {
|
||||
success++
|
||||
} else {
|
||||
log.With("thread", id).Errorf("reindexOutdatedThreads failed to index doc: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
return len(idsToReindex), success, nil
|
||||
}
|
||||
|
||||
func (i *indexer) reindexDoc(ctx context.Context, id string, indexesWereRemoved bool) error {
|
||||
|
@ -807,210 +810,82 @@ func (i *indexer) reindexIdsIgnoreErr(ctx context.Context, indexRemoved bool, id
|
|||
return
|
||||
}
|
||||
|
||||
func (i *indexer) Index(ctx context.Context, info doc.DocInfo) error {
|
||||
startTime := time.Now()
|
||||
sbType, err := i.typeProvider.Type(info.Id)
|
||||
if err != nil {
|
||||
sbType = smartblock.SmartBlockTypePage
|
||||
}
|
||||
saveIndexedHash := func() {
|
||||
if headsHash := headsHash(info.Heads); headsHash != "" {
|
||||
err = i.store.SaveLastIndexedHeadsHash(info.Id, headsHash)
|
||||
if err != nil {
|
||||
log.With("thread", info.Id).Errorf("failed to save indexed heads hash: %v", err)
|
||||
}
|
||||
}
|
||||
func (i *indexer) migrateRelations(rels []*model.Relation) {
|
||||
if len(rels) == 0 {
|
||||
return
|
||||
}
|
||||
i.relationMigratorMu.Lock()
|
||||
defer i.relationMigratorMu.Unlock()
|
||||
|
||||
indexDetails, indexLinks := sbType.Indexable()
|
||||
if sbType != smartblock.SmartBlockTypeSubObject && sbType != smartblock.SmartBlockTypeWorkspace && sbType != smartblock.SmartBlockTypeBreadcrumbs {
|
||||
// avoid recursions
|
||||
log.With("migratedtype", sbType).Warn("migrating types")
|
||||
if pbtypes.GetString(info.State.CombinedDetails(), bundle.RelationKeyCreator.String()) != addr.AnytypeProfileId {
|
||||
i.migrateRelations(extractOldRelationsFromState(info.State))
|
||||
i.migrateObjectTypes(info.State.ObjectTypesToMigrate())
|
||||
}
|
||||
}
|
||||
if !indexDetails && !indexLinks {
|
||||
saveIndexedHash()
|
||||
return nil
|
||||
}
|
||||
|
||||
details := info.State.CombinedDetails()
|
||||
details.Fields[bundle.RelationKeyLinks.String()] = pbtypes.StringList(info.Links)
|
||||
setCreator := pbtypes.GetString(info.State.LocalDetails(), bundle.RelationKeyCreator.String())
|
||||
if setCreator == "" {
|
||||
setCreator = i.anytype.ProfileID()
|
||||
}
|
||||
indexSetTime := time.Now()
|
||||
var hasError bool
|
||||
if indexLinks {
|
||||
if err = i.store.UpdateObjectLinks(info.Id, info.Links); err != nil {
|
||||
hasError = true
|
||||
log.With("thread", info.Id).Errorf("failed to save object links: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
indexLinksTime := time.Now()
|
||||
if indexDetails {
|
||||
if err := i.store.UpdateObjectDetails(info.Id, details, false); err != nil {
|
||||
hasError = true
|
||||
log.With("thread", info.Id).Errorf("can't update object store: %v", err)
|
||||
}
|
||||
if err := i.store.AddToIndexQueue(info.Id); err != nil {
|
||||
log.With("thread", info.Id).Errorf("can't add id to index queue: %v", err)
|
||||
} else {
|
||||
log.With("thread", info.Id).Debugf("to index queue")
|
||||
}
|
||||
if i.relationBulkMigration != nil {
|
||||
i.relationBulkMigration.AddRelations(rels)
|
||||
} else {
|
||||
_ = i.store.DeleteDetails(info.Id)
|
||||
}
|
||||
indexDetailsTime := time.Now()
|
||||
detailsCount := 0
|
||||
if details.GetFields() != nil {
|
||||
detailsCount = len(details.GetFields())
|
||||
}
|
||||
|
||||
if !hasError {
|
||||
saveIndexedHash()
|
||||
}
|
||||
|
||||
metrics.SharedClient.RecordEvent(metrics.IndexEvent{
|
||||
ObjectId: info.Id,
|
||||
IndexLinksTimeMs: indexLinksTime.Sub(indexSetTime).Milliseconds(),
|
||||
IndexDetailsTimeMs: indexDetailsTime.Sub(indexLinksTime).Milliseconds(),
|
||||
IndexSetRelationsTimeMs: indexSetTime.Sub(startTime).Milliseconds(),
|
||||
RelationsCount: len(info.State.PickRelationLinks()),
|
||||
DetailsCount: detailsCount,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *indexer) ftLoop() {
|
||||
ticker := time.NewTicker(ftIndexInterval)
|
||||
i.ftIndex()
|
||||
var lastForceIndex time.Time
|
||||
i.mu.Lock()
|
||||
quit := i.quit
|
||||
i.mu.Unlock()
|
||||
for {
|
||||
select {
|
||||
case <-quit:
|
||||
return
|
||||
case <-ticker.C:
|
||||
i.ftIndex()
|
||||
case <-i.forceFt:
|
||||
if time.Since(lastForceIndex) > ftIndexForceMinInterval {
|
||||
i.ftIndex()
|
||||
lastForceIndex = time.Now()
|
||||
}
|
||||
err := i.relationService.MigrateRelations(rels)
|
||||
if err != nil {
|
||||
log.Errorf("migrateRelations got error: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexer) ftIndex() {
|
||||
if err := i.store.IndexForEach(i.ftIndexDoc); err != nil {
|
||||
log.Errorf("store.IndexForEach error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexer) ftIndexDoc(id string, _ time.Time) (err error) {
|
||||
st := time.Now()
|
||||
// ctx := context.WithValue(context.Background(), ocache.CacheTimeout, cacheTimeout)
|
||||
ctx := context.WithValue(context.Background(), metrics.CtxKeyRequest, "index_fulltext")
|
||||
|
||||
info, err := i.doc.GetDocInfo(ctx, id)
|
||||
if err != nil {
|
||||
func (i *indexer) migrateObjectTypes(ots []string) {
|
||||
if len(ots) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
sbType, err := i.typeProvider.Type(info.Id)
|
||||
if err != nil {
|
||||
sbType = smartblock.SmartBlockTypePage
|
||||
}
|
||||
indexDetails, _ := sbType.Indexable()
|
||||
if !indexDetails {
|
||||
return nil
|
||||
var typesModels []*model.ObjectType // do not make
|
||||
for _, ot := range ots {
|
||||
t, err := bundle.GetTypeByUrl(ot)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
typesModels = append(typesModels, t)
|
||||
}
|
||||
|
||||
if err = i.store.UpdateObjectSnippet(id, info.State.Snippet()); err != nil {
|
||||
if len(typesModels) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if len(info.FileHashes) > 0 {
|
||||
// todo: move file indexing to the main indexer as we have the full state there now
|
||||
existingIDs, err := i.store.HasIDs(info.FileHashes...)
|
||||
i.relationMigratorMu.Lock()
|
||||
defer i.relationMigratorMu.Unlock()
|
||||
|
||||
if i.relationBulkMigration != nil {
|
||||
i.relationBulkMigration.AddObjectTypes(typesModels)
|
||||
} else {
|
||||
err := i.relationService.MigrateObjectTypes(typesModels)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get existing file ids : %s", err.Error())
|
||||
}
|
||||
newIds := slice.Difference(info.FileHashes, existingIDs)
|
||||
for _, hash := range newIds {
|
||||
// file's hash is id
|
||||
err = i.reindexDoc(ctx, hash, false)
|
||||
if err != nil {
|
||||
log.With("id", hash).Errorf("failed to reindex file: %s", err.Error())
|
||||
}
|
||||
|
||||
err = i.store.AddToIndexQueue(hash)
|
||||
if err != nil {
|
||||
log.With("id", hash).Error(err.Error())
|
||||
}
|
||||
log.Errorf("migrateObjectTypes got error: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if fts := i.store.FTSearch(); fts != nil {
|
||||
title := pbtypes.GetString(info.State.Details(), bundle.RelationKeyName.String())
|
||||
if info.State.ObjectType() == bundle.TypeKeyNote.String() || title == "" {
|
||||
title = info.State.Snippet()
|
||||
}
|
||||
ftDoc := ftsearch.SearchDoc{
|
||||
Id: id,
|
||||
Title: title,
|
||||
Text: info.State.SearchText(),
|
||||
}
|
||||
if err := fts.Index(ftDoc); err != nil {
|
||||
log.Errorf("can't ft index doc: %v", err)
|
||||
}
|
||||
log.Debugf("ft search indexed with title: '%s'", ftDoc.Title)
|
||||
}
|
||||
|
||||
log.With("thread", id).Infof("ft index updated for a %v", time.Since(st))
|
||||
return
|
||||
}
|
||||
|
||||
func (i *indexer) ftInit() error {
|
||||
if ft := i.store.FTSearch(); ft != nil {
|
||||
docCount, err := ft.DocCount()
|
||||
func (i *indexer) getIdsForTypes(sbt ...smartblock.SmartBlockType) ([]string, error) {
|
||||
var ids []string
|
||||
for _, t := range sbt {
|
||||
st, err := i.source.SourceTypeBySbType(t)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
if docCount == 0 {
|
||||
ids, err := i.store.ListIds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, id := range ids {
|
||||
if err := i.store.AddToIndexQueue(id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
idsT, err := st.ListIds()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ids = append(ids, idsT...)
|
||||
}
|
||||
return nil
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (i *indexer) Close(ctx context.Context) (err error) {
|
||||
i.mu.Lock()
|
||||
quit := i.quit
|
||||
i.mu.Unlock()
|
||||
if quit != nil {
|
||||
close(quit)
|
||||
i.mu.Lock()
|
||||
i.quit = nil
|
||||
i.mu.Unlock()
|
||||
func extractOldRelationsFromState(s *state.State) []*model.Relation {
|
||||
var rels []*model.Relation
|
||||
if objRels := s.OldExtraRelations(); len(objRels) > 0 {
|
||||
rels = append(rels, s.OldExtraRelations()...)
|
||||
}
|
||||
return nil
|
||||
|
||||
if dvBlock := s.Pick(template.DataviewBlockId); dvBlock != nil {
|
||||
rels = append(rels, dvBlock.Model().GetDataview().GetRelations()...)
|
||||
}
|
||||
|
||||
return rels
|
||||
}
|
||||
|
||||
func headsHash(heads []string) string {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue