mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-09 09:35:00 +09:00
GO-5635 introduce protections and debug for tantivy
This commit is contained in:
parent
5c9e764948
commit
0e5a3fa109
9 changed files with 332 additions and 48 deletions
|
@ -16,11 +16,13 @@ package ftsearch
|
|||
import "C"
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
|
@ -29,9 +31,11 @@ import (
|
|||
tantivy "github.com/anyproto/tantivy-go"
|
||||
"github.com/valyala/fastjson"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/domain"
|
||||
"github.com/anyproto/anytype-heart/core/wallet"
|
||||
"github.com/anyproto/anytype-heart/metrics"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/ftsearch/tantivycheck"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/logging"
|
||||
"github.com/anyproto/anytype-heart/util/text"
|
||||
)
|
||||
|
@ -40,7 +44,7 @@ const (
|
|||
CName = "fts"
|
||||
ftsDir = "fts"
|
||||
ftsDir2 = "fts_tantivy"
|
||||
ftsVer = "13"
|
||||
ftsVer = "14"
|
||||
docLimit = 10000
|
||||
|
||||
fieldTitle = "Title"
|
||||
|
@ -57,7 +61,10 @@ const (
|
|||
tokenizerId = "SimpleIdTokenizer"
|
||||
)
|
||||
|
||||
var log = logging.Logger("ftsearch")
|
||||
var (
|
||||
log = logging.Logger("ftsearch")
|
||||
ErrAppClosingInitiated = errors.New("app closing initiated")
|
||||
)
|
||||
|
||||
type FTSearch interface {
|
||||
app.ComponentRunnable
|
||||
|
@ -93,14 +100,15 @@ type DocumentMatch struct {
|
|||
}
|
||||
|
||||
type ftSearch struct {
|
||||
rootPath string
|
||||
ftsPath string
|
||||
builderId string
|
||||
index *tantivy.TantivyContext
|
||||
parserPool *fastjson.ParserPool
|
||||
mu sync.Mutex
|
||||
blevePath string
|
||||
lang tantivy.Language
|
||||
rootPath string
|
||||
ftsPath string
|
||||
builderId string
|
||||
index *tantivy.TantivyContext
|
||||
parserPool *fastjson.ParserPool
|
||||
mu sync.Mutex
|
||||
blevePath string
|
||||
lang tantivy.Language
|
||||
appClosingInitiated atomic.Bool
|
||||
}
|
||||
|
||||
func (f *ftSearch) ProvideStat() any {
|
||||
|
@ -126,6 +134,9 @@ func (f *ftSearch) BatchDeleteObjects(ids []string) error {
|
|||
}
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if f.appClosingInitiated.Load() {
|
||||
return ErrAppClosingInitiated
|
||||
}
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
spentMs := time.Since(start).Milliseconds()
|
||||
|
@ -147,6 +158,9 @@ func (f *ftSearch) BatchDeleteObjects(ids []string) error {
|
|||
func (f *ftSearch) DeleteObject(objectId string) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if f.appClosingInitiated.Load() {
|
||||
return ErrAppClosingInitiated
|
||||
}
|
||||
return f.index.DeleteDocuments(fieldIdRaw, objectId)
|
||||
}
|
||||
|
||||
|
@ -182,6 +196,24 @@ func (f *ftSearch) Name() (name string) {
|
|||
}
|
||||
|
||||
func (f *ftSearch) Run(context.Context) error {
|
||||
report, err := tantivycheck.Check(f.ftsPath)
|
||||
if err != nil {
|
||||
if !errors.Is(err, os.ErrNotExist) {
|
||||
log.Warnf("tantivy index checking failed: %v", err)
|
||||
}
|
||||
}
|
||||
if !report.IsOk() {
|
||||
log.With("missingSegments", len(report.MissingSegments)).
|
||||
With("missingDelFiles", len(report.MissingDelFiles)).
|
||||
With("extraSegments", len(report.ExtraSegments)).
|
||||
With("extraDelFiles", len(report.ExtraDelFiles)).
|
||||
With("writerLockPresent", report.WriterLockPresent).
|
||||
With("metaLockPresent", report.MetaLockPresent).
|
||||
With("totalSegmentsInMeta", report.TotalSegmentsInMeta).
|
||||
With("uniqueSegmentPrefixesOnDisk", report.UniqueSegmentPrefixesOnDisk).
|
||||
Warnf("tantivy index is inconsistent state, dry run")
|
||||
}
|
||||
|
||||
builder, err := tantivy.NewSchemaBuilder()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -328,6 +360,9 @@ func (f *ftSearch) tryToBuildSchema(schema *tantivy.Schema) (*tantivy.TantivyCon
|
|||
func (f *ftSearch) Index(doc SearchDoc) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if f.appClosingInitiated.Load() {
|
||||
return ErrAppClosingInitiated
|
||||
}
|
||||
metrics.ObjectFTUpdatedCounter.Inc()
|
||||
tantivyDoc, err := f.convertDoc(doc)
|
||||
if err != nil {
|
||||
|
@ -375,6 +410,9 @@ func (f *ftSearch) BatchIndex(ctx context.Context, docs []SearchDoc, deletedDocs
|
|||
}
|
||||
}()
|
||||
f.mu.Lock()
|
||||
if f.appClosingInitiated.Load() {
|
||||
return ErrAppClosingInitiated
|
||||
}
|
||||
err = f.index.DeleteDocuments(fieldIdRaw, deletedDocs...)
|
||||
f.mu.Unlock()
|
||||
if err != nil {
|
||||
|
@ -390,6 +428,9 @@ func (f *ftSearch) BatchIndex(ctx context.Context, docs []SearchDoc, deletedDocs
|
|||
}
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if f.appClosingInitiated.Load() {
|
||||
return ErrAppClosingInitiated
|
||||
}
|
||||
return f.index.AddAndConsumeDocuments(tantivyDocs...)
|
||||
}
|
||||
|
||||
|
@ -572,8 +613,10 @@ func (f *ftSearch) DocCount() (uint64, error) {
|
|||
|
||||
func (f *ftSearch) Close(ctx context.Context) error {
|
||||
if f.index != nil {
|
||||
f.index.Free()
|
||||
f.index = nil
|
||||
err := f.index.Close()
|
||||
if err != nil {
|
||||
log.Errorf("failed to close tantivy index: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -582,6 +625,12 @@ func (f *ftSearch) cleanupBleve() {
|
|||
_ = os.RemoveAll(f.blevePath)
|
||||
}
|
||||
|
||||
func (f *ftSearch) StateChange(state int) {
|
||||
if state == int(domain.CompStateAppClosingInitiated) {
|
||||
f.appClosingInitiated.Store(true)
|
||||
}
|
||||
}
|
||||
|
||||
func prepareQuery(query string) string {
|
||||
query = text.Truncate(query, 100, "")
|
||||
query = strings.ToLower(query)
|
||||
|
|
230
pkg/lib/localstore/ftsearch/tantivycheck/tantivycheck.go
Normal file
230
pkg/lib/localstore/ftsearch/tantivycheck/tantivycheck.go
Normal file
|
@ -0,0 +1,230 @@
|
|||
// Package tantivycheck provides a DRY-RUN consistency check for Tantivy index
|
||||
// directories.
|
||||
//
|
||||
// It verifies that
|
||||
//
|
||||
// - every segment listed in meta.json has files on disk
|
||||
// - every expected <segment>.<opstamp>.del file exists
|
||||
// - there are no extra segment prefixes on disk
|
||||
// - there are no extra .del files on disk
|
||||
// - the special lock files INDEX_WRITER_LOCK and META_LOCK are noted
|
||||
//
|
||||
// Nothing is modified on disk; you simply get a ConsistencyReport back.
|
||||
package tantivycheck
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Package-level helpers (compiled once)
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
var (
|
||||
segPrefixRe = regexp.MustCompile(`^([0-9a-f]{32})(?:\..+)?$`)
|
||||
delFileRe = regexp.MustCompile(`^([0-9a-f]{32})\.(\d+)\.del$`)
|
||||
)
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Public API
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
// ConsistencyReport gathers all findings of the dry-run.
|
||||
type ConsistencyReport struct {
|
||||
// Segments present in meta.json but with no files on disk.
|
||||
MissingSegments []string
|
||||
// <segment>.<opstamp>.del files that meta.json expects but are absent.
|
||||
MissingDelFiles []string
|
||||
// Segment prefixes that exist on disk but are not referenced in meta.json.
|
||||
ExtraSegments []string
|
||||
// .del files on disk that are not referenced (wrong opstamp or orphan).
|
||||
ExtraDelFiles []string
|
||||
|
||||
// Lock-file information
|
||||
WriterLockPresent bool // true if INDEX_WRITER_LOCK exists
|
||||
MetaLockPresent bool // true if META_LOCK exists
|
||||
|
||||
// Informational counters
|
||||
TotalSegmentsInMeta int
|
||||
UniqueSegmentPrefixesOnDisk int
|
||||
}
|
||||
|
||||
// Check runs the consistency test against dir and returns a report.
|
||||
//
|
||||
// It fails with an error if meta.json is absent or can’t be decoded.
|
||||
func Check(dir string) (ConsistencyReport, error) {
|
||||
// ---------------------------------------------------------------------
|
||||
// 1) Parse meta.json
|
||||
// ---------------------------------------------------------------------
|
||||
meta, err := readMeta(filepath.Join(dir, "meta.json"))
|
||||
if err != nil {
|
||||
return ConsistencyReport{}, err
|
||||
}
|
||||
|
||||
// Build metaSegments: 32-hex-id (no dashes) → expected opstamp (nil if none)
|
||||
metaSegments := make(map[string]*uint64, len(meta.Segments))
|
||||
for _, s := range meta.Segments {
|
||||
id := stripDashes(s.SegmentID)
|
||||
if s.Deletes != nil {
|
||||
metaSegments[id] = &s.Deletes.Opstamp
|
||||
} else {
|
||||
metaSegments[id] = nil
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 2) Walk directory once
|
||||
// ---------------------------------------------------------------------
|
||||
segmentPrefixesDisk := map[string]struct{}{}
|
||||
delFilesDisk := map[[2]string]struct{}{} // key = [segPrefix, opstamp]
|
||||
|
||||
var writerLockPresent, metaLockPresent bool
|
||||
|
||||
err = filepath.WalkDir(dir, func(path string, d fs.DirEntry, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return walkErr
|
||||
}
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
name := d.Name()
|
||||
|
||||
switch name {
|
||||
case "INDEX_WRITER_LOCK":
|
||||
writerLockPresent = true
|
||||
case "META_LOCK":
|
||||
metaLockPresent = true
|
||||
}
|
||||
|
||||
if m := segPrefixRe.FindStringSubmatch(name); m != nil {
|
||||
segmentPrefixesDisk[m[1]] = struct{}{}
|
||||
}
|
||||
if m := delFileRe.FindStringSubmatch(name); m != nil {
|
||||
delFilesDisk[[2]string{m[1], m[2]}] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return ConsistencyReport{}, fmt.Errorf("scanning directory: %w", err)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 3) Compare sets
|
||||
// ---------------------------------------------------------------------
|
||||
var (
|
||||
missingSegments []string
|
||||
extraSegments []string
|
||||
missingDelFiles []string
|
||||
extraDelFiles []string
|
||||
)
|
||||
|
||||
// missing segments
|
||||
for id := range metaSegments {
|
||||
if _, ok := segmentPrefixesDisk[id]; !ok {
|
||||
missingSegments = append(missingSegments, id)
|
||||
}
|
||||
}
|
||||
|
||||
// extra segments
|
||||
for id := range segmentPrefixesDisk {
|
||||
if _, ok := metaSegments[id]; !ok {
|
||||
extraSegments = append(extraSegments, id)
|
||||
}
|
||||
}
|
||||
|
||||
// missing del files
|
||||
for id, opPtr := range metaSegments {
|
||||
if opPtr == nil {
|
||||
continue // no deletes expected
|
||||
}
|
||||
opStr := strconv.FormatUint(*opPtr, 10)
|
||||
if _, ok := delFilesDisk[[2]string{id, opStr}]; !ok {
|
||||
missingDelFiles = append(missingDelFiles, fmt.Sprintf("%s.%s.del", id, opStr))
|
||||
}
|
||||
}
|
||||
|
||||
// extra del files
|
||||
for key := range delFilesDisk {
|
||||
id, opStr := key[0], key[1]
|
||||
expectedOpPtr, segKnown := metaSegments[id]
|
||||
if !segKnown || expectedOpPtr == nil || strconv.FormatUint(*expectedOpPtr, 10) != opStr {
|
||||
extraDelFiles = append(extraDelFiles, fmt.Sprintf("%s.%s.del", id, opStr))
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// 4) Return aggregated report
|
||||
// ---------------------------------------------------------------------
|
||||
return ConsistencyReport{
|
||||
MissingSegments: missingSegments,
|
||||
MissingDelFiles: missingDelFiles,
|
||||
ExtraSegments: extraSegments,
|
||||
ExtraDelFiles: extraDelFiles,
|
||||
WriterLockPresent: writerLockPresent,
|
||||
MetaLockPresent: metaLockPresent,
|
||||
TotalSegmentsInMeta: len(metaSegments),
|
||||
UniqueSegmentPrefixesOnDisk: len(segmentPrefixesDisk),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// IsOk returns true when the report is free of inconsistencies:
|
||||
//
|
||||
// - no segments are missing
|
||||
// - no .del files are missing
|
||||
// - no extra segments are present
|
||||
// - no extra .del files are present
|
||||
//
|
||||
// The presence of INDEX_WRITER_LOCK or META_LOCK is *not* considered
|
||||
// an inconsistency—these files are expected during normal operation and
|
||||
// merely reported for information.
|
||||
func (r *ConsistencyReport) IsOk() bool {
|
||||
return len(r.MissingSegments) == 0 &&
|
||||
len(r.MissingDelFiles) == 0 &&
|
||||
len(r.ExtraSegments) == 0 &&
|
||||
len(r.ExtraDelFiles) == 0 &&
|
||||
!r.WriterLockPresent &&
|
||||
!r.MetaLockPresent
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Internal helpers
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
// metaFile mirrors only the parts of meta.json we need.
|
||||
type metaFile struct {
|
||||
Segments []struct {
|
||||
SegmentID string `json:"segment_id"`
|
||||
Deletes *struct {
|
||||
Opstamp uint64 `json:"opstamp"`
|
||||
} `json:"deletes"`
|
||||
} `json:"segments"`
|
||||
}
|
||||
|
||||
func readMeta(path string) (*metaFile, error) {
|
||||
b, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading %s: %w", path, err)
|
||||
}
|
||||
var m metaFile
|
||||
if err := json.Unmarshal(b, &m); err != nil {
|
||||
return nil, fmt.Errorf("decoding meta.json: %w", err)
|
||||
}
|
||||
return &m, nil
|
||||
}
|
||||
|
||||
func stripDashes(s string) string {
|
||||
out := make([]byte, 0, len(s))
|
||||
for i := 0; i < len(s); i++ {
|
||||
if s[i] != '-' {
|
||||
out = append(out, s[i])
|
||||
}
|
||||
}
|
||||
return string(out)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue