1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-09 09:35:00 +09:00

GO-2246 delete old subobjects from the localstore

This commit is contained in:
Roman Khafizianov 2023-10-23 17:58:50 +02:00
parent b6b58342f1
commit 28ed0b1a3b
No known key found for this signature in database
GPG key ID: F07A7D55A2684852
3 changed files with 74 additions and 23 deletions

View file

@ -4,9 +4,12 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/anyproto/any-sync/util/slice"
"github.com/dgraph-io/badger/v3"
"github.com/globalsign/mgo/bson"
"go.uber.org/zap"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
@ -137,18 +140,28 @@ func (i *indexer) ReindexSpace(space space.Space) (err error) {
successfullyReindexed := i.reindexIdsIgnoreErr(ctx, space, ids...)
i.logFinishedReindexStat(metrics.ReindexTypeThreads, len(ids), successfullyReindexed, time.Since(start))
log.Infof("%d/%d objects have been successfully reindexed", successfullyReindexed, len(ids))
l := log.With(zap.String("space", space.Id()), zap.Int("total", len(ids)), zap.Int("succeed", successfullyReindexed))
if successfullyReindexed != len(ids) {
l.Errorf("reindex partially failed")
} else {
l.Infof("reindex finished")
}
} else {
// Index objects that updated, but not indexed yet
// TODO Write more informative comment
// we can have objects which actual state is newer than the indexed one
// this may happen e.g. if the app got closed in the middle of object updates processing
// So here we reindexOutdatedObjects which compare the last indexed heads hash with the actual one
go func() {
start := time.Now()
total, success, err := i.reindexOutdatedObjects(ctx, space)
if err != nil {
log.Infof("failed to reindex outdated objects: %s", err)
log.Errorf("reindex outdated failed: %s", err)
}
l := log.With(zap.String("space", space.Id()), zap.Int("total", total), zap.Int("succeed", success), zap.Int("spentMs", int(time.Since(start).Milliseconds())))
if success != total {
l.Errorf("reindex outdated partially failed")
} else {
log.Infof("%d/%d outdated objects have been successfully reindexed", success, total)
l.Debugf("reindex outdated finished")
}
if total > 0 {
i.logFinishedReindexStat(metrics.ReindexTypeOutdatedHeads, total, success, time.Since(start))
@ -250,6 +263,34 @@ func (i *indexer) ReindexMarketplaceSpace(space space.Space) error {
return i.saveLatestChecksums(space.Id())
}
// removeOldObjects removes all objects that are not supported anymore (e.g. old subobjects) and no longer returned by the underlying source
func (i *indexer) removeOldObjects() (err error) {
ids, err := i.store.ListIds()
if err != nil {
return err
}
ids = slice.Filter(ids, func(id string) bool {
if strings.HasPrefix(id, addr.RelationKeyToIdPrefix) {
return true
}
if strings.HasPrefix(id, addr.ObjectTypeKeyToIdPrefix) {
return true
}
if bson.IsObjectIdHex(id) {
return true
}
return false
})
if len(ids) == 0 {
return
}
err = i.store.DeleteDetails(ids...)
log.With(zap.Int("count", len(ids)), zap.Error(err)).Warnf("removeOldObjects")
return err
}
func (i *indexer) removeCommonIndexes(spaceId string, flags reindexFlags) (err error) {
if flags.any() {
log.Infof("start store reindex (%s)", flags.String())
@ -272,6 +313,11 @@ func (i *indexer) removeCommonIndexes(spaceId string, flags reindexFlags) (err e
}
if flags.removeAllIndexedObjects {
err = i.removeOldObjects()
if err != nil {
err = nil
log.Errorf("reindex failed to removeOldObjects: %v", err)
}
ids, err := i.store.ListIdsBySpace(spaceId)
if err != nil {
log.Errorf("reindex failed to get all ids(removeAllIndexedObjects): %v", err)

View file

@ -8,6 +8,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/gogo/protobuf/types"
ds "github.com/ipfs/go-datastore"
"github.com/samber/lo"
"golang.org/x/exp/slices"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -15,25 +16,29 @@ import (
"github.com/anyproto/anytype-heart/util/pbtypes"
)
func (s *dsObjectStore) DeleteDetails(id string) error {
key := pagesDetailsBase.ChildString(id).Bytes()
err := s.updateTxn(func(txn *badger.Txn) error {
for _, k := range []ds.Key{
pagesSnippetBase.ChildString(id),
pagesDetailsBase.ChildString(id),
indexedHeadsState.ChildString(id),
} {
if err := txn.Delete(k.Bytes()); err != nil {
return fmt.Errorf("delete key %s: %w", k, err)
}
}
func (s *dsObjectStore) DeleteDetails(ids ...string) error {
for _, chunk := range lo.Chunk(ids, 100) {
err := s.updateTxn(func(txn *badger.Txn) error {
for _, id := range chunk {
s.cache.Del(pagesDetailsBase.ChildString(id).Bytes())
return txn.Delete(key)
})
if err != nil {
return err
for _, k := range []ds.Key{
pagesDetailsBase.ChildString(id),
pagesSnippetBase.ChildString(id),
pagesDetailsBase.ChildString(id),
indexedHeadsState.ChildString(id),
} {
if err := txn.Delete(k.Bytes()); err != nil {
return fmt.Errorf("delete key %s: %w", k, err)
}
}
}
return nil
})
if err != nil {
return err
}
}
s.cache.Del(key)
return nil
}

View file

@ -127,7 +127,7 @@ type ObjectStore interface {
UpdatePendingLocalDetails(id string, proc func(details *types.Struct) (*types.Struct, error)) error
DeleteObject(id string) error
DeleteDetails(id string) error
DeleteDetails(id ...string) error
// EraseIndexes erase all indexes for objectstore. All objects need to be reindexed
EraseIndexes(spaceId string) error