1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-10 18:10:49 +09:00

GO-1483: Clean up

This commit is contained in:
Sergey 2023-06-21 18:14:54 +05:00
parent 1d594c1cc2
commit 6dc9f60ee2
No known key found for this signature in database
GPG key ID: 3B6BEF79160221C6
5 changed files with 307 additions and 366 deletions

View file

@ -0,0 +1,114 @@
package objectstore
import (
"errors"
"fmt"
"github.com/dgraph-io/badger/v3"
"github.com/gogo/protobuf/types"
ds "github.com/ipfs/go-datastore"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
func (s *dsObjectStore) DeleteDetails(id string) error {
key := pagesDetailsBase.ChildString(id).Bytes()
return s.updateTxn(func(txn *badger.Txn) error {
s.cache.Del(key)
for _, k := range []ds.Key{
pagesSnippetBase.ChildString(id),
pagesDetailsBase.ChildString(id),
} {
if err := txn.Delete(k.Bytes()); err != nil {
return fmt.Errorf("delete key %s: %w", k, err)
}
}
return txn.Delete(key)
})
}
// DeleteObject removes all details, leaving only id and isDeleted
func (s *dsObjectStore) DeleteObject(id string) error {
// do not completely remove object details, so we can distinguish links to deleted and not-yet-loaded objects
err := s.UpdateObjectDetails(id, &types.Struct{
Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String(id),
bundle.RelationKeyIsDeleted.String(): pbtypes.Bool(true), // maybe we can store the date instead?
},
})
if err != nil {
if !errors.Is(err, ErrDetailsNotChanged) {
return fmt.Errorf("failed to overwrite details and relations: %w", err)
}
}
return retryOnConflict(func() error {
txn := s.db.NewTransaction(true)
defer txn.Discard()
for _, k := range []ds.Key{
pagesSnippetBase.ChildString(id),
indexQueueBase.ChildString(id),
indexedHeadsState.ChildString(id),
} {
if err = txn.Delete(k.Bytes()); err != nil {
return err
}
}
txn, _, err = s.removeByPrefixInTx(txn, pagesInboundLinksBase.String()+"/"+id+"/")
if err != nil {
return err
}
txn, _, err = s.removeByPrefixInTx(txn, pagesOutboundLinksBase.String()+"/"+id+"/")
if err != nil {
return err
}
err = txn.Commit()
if err != nil {
return fmt.Errorf("delete object info: %w", err)
}
if s.fts != nil {
err = s.removeFromIndexQueue(id)
if err != nil {
log.Errorf("error removing %s from index queue: %s", id, err)
}
if err := s.fts.Delete(id); err != nil {
return err
}
}
return nil
})
}
func (s *dsObjectStore) removeByPrefixInTx(txn *badger.Txn, prefix string) (*badger.Txn, int, error) {
var toDelete [][]byte
err := iterateKeysByPrefixTx(txn, []byte(prefix), func(key []byte) {
toDelete = append(toDelete, key)
})
if err != nil {
return txn, 0, fmt.Errorf("iterate keys: %w", err)
}
var removed int
for _, key := range toDelete {
err = txn.Delete(key)
if err == badger.ErrTxnTooBig {
err = txn.Commit()
if err != nil {
return txn, removed, fmt.Errorf("commit big transaction: %w", err)
}
txn = s.db.NewTransaction(true)
err = txn.Delete(key)
}
if err != nil {
return txn, removed, fmt.Errorf("delete key %s: %w", key, err)
}
removed++
}
return txn, removed, nil
}

View file

@ -1,215 +0,0 @@
package objectstore
import (
"fmt"
"path"
"github.com/dgraph-io/badger/v3"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/huandu/skiplist"
ds "github.com/ipfs/go-datastore"
"github.com/samber/lo"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/database"
"github.com/anyproto/anytype-heart/pkg/lib/database/filter"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/pkg/lib/schema"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
func (s *dsObjectStore) UpdateObjectDetails(id string, details *types.Struct) error {
if details == nil {
return nil
}
if details.Fields == nil {
return fmt.Errorf("details fields are nil")
}
key := pagesDetailsBase.ChildString(id).Bytes()
return s.updateTxn(func(txn *badger.Txn) error {
prev, ok := s.cache.Get(key)
if !ok {
it, err := txn.Get(key)
if err != nil && err != badger.ErrKeyNotFound {
return fmt.Errorf("get item: %w", err)
}
if err != badger.ErrKeyNotFound {
prev, err = s.unmarshalDetailsFromItem(it)
if err != nil {
return fmt.Errorf("extract details: %w", err)
}
}
}
detailsModel := &model.ObjectDetails{
Details: details,
}
if prev != nil && proto.Equal(prev.(*model.ObjectDetails), detailsModel) {
return ErrDetailsNotChanged
}
// Ensure ID is set
details.Fields[bundle.RelationKeyId.String()] = pbtypes.String(id)
s.sendUpdatesToSubscriptions(id, details)
s.cache.Set(key, detailsModel, int64(detailsModel.Size()))
val, err := proto.Marshal(detailsModel)
if err != nil {
return fmt.Errorf("marshal details: %w", err)
}
return txn.Set(key, val)
})
}
func (s *dsObjectStore) DeleteDetails(id string) error {
key := pagesDetailsBase.ChildString(id).Bytes()
return s.updateTxn(func(txn *badger.Txn) error {
s.cache.Del(key)
for _, k := range []ds.Key{
pagesSnippetBase.ChildString(id),
pagesDetailsBase.ChildString(id),
} {
if err := txn.Delete(k.Bytes()); err != nil {
return fmt.Errorf("delete key %s: %w", k, err)
}
}
return txn.Delete(key)
})
}
func (s *dsObjectStore) Query(sch schema.Schema, q database.Query) ([]database.Record, int, error) {
filters, err := s.buildQuery(sch, q)
if err != nil {
return nil, 0, fmt.Errorf("build query: %w", err)
}
recs, err := s.QueryRaw(filters, q.Limit, q.Offset)
return recs, 0, err
}
func (s *dsObjectStore) QueryRaw(filters *database.Filters, limit int, offset int) ([]database.Record, error) {
if filters == nil || filters.FilterObj == nil {
return nil, fmt.Errorf("filter cannot be nil or unitialized")
}
skl := skiplist.New(order{filters.Order})
err := s.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = pagesDetailsBase.Bytes()
iterator := txn.NewIterator(opts)
defer iterator.Close()
for iterator.Rewind(); iterator.Valid(); iterator.Next() {
it := iterator.Item()
details, err := s.extractDetailsFromItem(it)
if err != nil {
return err
}
rec := database.Record{Details: details.Details}
if filters.FilterObj != nil && filters.FilterObj.FilterObject(rec) {
if offset > 0 {
offset--
continue
}
if limit > 0 && skl.Len() >= limit {
break
}
skl.Set(rec, nil)
}
}
return nil
})
if err != nil {
return nil, err
}
records := make([]database.Record, 0, skl.Len())
for it := skl.Front(); it != nil; it = it.Next() {
records = append(records, it.Key().(database.Record))
}
return records, nil
}
func (s *dsObjectStore) QueryById(ids []string) (records []database.Record, err error) {
err = s.db.View(func(txn *badger.Txn) error {
iterator := txn.NewIterator(badger.DefaultIteratorOptions)
defer iterator.Close()
for iterator.Rewind(); iterator.Valid(); iterator.Next() {
it := iterator.Item()
details, err := s.extractDetailsFromItem(it)
if err != nil {
return err
}
if lo.Contains(ids, pbtypes.GetString(details.Details, bundle.RelationKeyId.String())) {
rec := database.Record{Details: details.Details}
records = append(records, rec)
}
}
return nil
})
if err != nil {
return nil, err
}
return records, nil
}
func (s *dsObjectStore) extractDetailsFromItem(it *badger.Item) (*model.ObjectDetails, error) {
key := it.Key()
if v, ok := s.cache.Get(key); ok {
return v.(*model.ObjectDetails), nil
} else {
return s.unmarshalDetailsFromItem(it)
}
}
func (s *dsObjectStore) unmarshalDetailsFromItem(it *badger.Item) (*model.ObjectDetails, error) {
var details *model.ObjectDetails
err := it.Value(func(val []byte) error {
var err error
details, err = unmarshalDetails(detailsKeyToID(it.Key()), val)
if err != nil {
return fmt.Errorf("unmarshal details: %w", err)
}
s.cache.Set(it.Key(), details, int64(details.Size()))
return nil
})
if err != nil {
return nil, fmt.Errorf("get item value: %w", err)
}
return details, nil
}
func detailsKeyToID(key []byte) string {
return path.Base(string(key))
}
type order struct {
filter.Order
}
func (o order) Compare(lhs, rhs interface{}) (comp int) {
le := lhs.(database.Record)
re := rhs.(database.Record)
if o.Order != nil {
comp = o.Order.Compare(le, re)
}
// when order isn't set or equal - sort by id
if comp == 0 {
if pbtypes.GetString(le.Details, "id") > pbtypes.GetString(re.Details, "id") {
return 1
} else {
return -1
}
}
return comp
}
func (o order) CalcScore(key interface{}) float64 {
return 0
}

View file

@ -20,8 +20,8 @@ import (
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/pkg/lib/database"
"github.com/anyproto/anytype-heart/pkg/lib/database/filter"
"github.com/anyproto/anytype-heart/pkg/lib/datastore"
"github.com/anyproto/anytype-heart/pkg/lib/datastore/noctxds"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/addr"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/ftsearch"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
@ -278,23 +278,6 @@ func (s *dsObjectStore) closeAndRemoveSubscription(subscription database.Subscri
}
}
func unmarshalDetails(id string, rawValue []byte) (*model.ObjectDetails, error) {
result := &model.ObjectDetails{}
if err := proto.Unmarshal(rawValue, result); err != nil {
return nil, err
}
if result.Details == nil {
result.Details = &types.Struct{Fields: map[string]*types.Value{}}
}
if result.Details.Fields == nil {
result.Details.Fields = map[string]*types.Value{}
} else {
pbtypes.StructDeleteEmptyFields(result.Details)
}
result.Details.Fields[database.RecordIDField] = pbtypes.ToValue(id)
return result, nil
}
func (s *dsObjectStore) SubscribeForAll(callback func(rec database.Record)) {
s.l.Lock()
s.onChangeCallback = callback
@ -347,61 +330,6 @@ func (s *dsObjectStore) GetRelationByKey(key string) (*model.Relation, error) {
return rel.Relation, nil
}
// DeleteObject removes all details, leaving only id and isDeleted
func (s *dsObjectStore) DeleteObject(id string) error {
// do not completely remove object details, so we can distinguish links to deleted and not-yet-loaded objects
err := s.UpdateObjectDetails(id, &types.Struct{
Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String(id),
bundle.RelationKeyIsDeleted.String(): pbtypes.Bool(true), // maybe we can store the date instead?
},
})
if err != nil {
if !errors.Is(err, ErrDetailsNotChanged) {
return fmt.Errorf("failed to overwrite details and relations: %w", err)
}
}
return retryOnConflict(func() error {
txn := s.db.NewTransaction(true)
defer txn.Discard()
for _, k := range []ds.Key{
pagesSnippetBase.ChildString(id),
indexQueueBase.ChildString(id),
indexedHeadsState.ChildString(id),
} {
if err = txn.Delete(k.Bytes()); err != nil {
return err
}
}
txn, _, err = s.removeByPrefixInTx(txn, pagesInboundLinksBase.String()+"/"+id+"/")
if err != nil {
return err
}
txn, _, err = s.removeByPrefixInTx(txn, pagesOutboundLinksBase.String()+"/"+id+"/")
if err != nil {
return err
}
err = txn.Commit()
if err != nil {
return fmt.Errorf("delete object info: %w", err)
}
if s.fts != nil {
err = s.removeFromIndexQueue(id)
if err != nil {
log.Errorf("error removing %s from index queue: %s", id, err)
}
if err := s.fts.Delete(id); err != nil {
return err
}
}
return nil
})
}
func (s *dsObjectStore) GetWithLinksInfoByID(id string) (*model.ObjectInfoWithLinks, error) {
var res *model.ObjectInfoWithLinks
err := s.db.View(func(txn *badger.Txn) error {
@ -548,14 +476,79 @@ func (s *dsObjectStore) FTSearch() ftsearch.FTSearch {
return s.fts
}
func hasObjectId(txn noctxds.Txn, id string) (bool, error) {
if exists, err := txn.Has(pagesDetailsBase.ChildString(id)); err != nil {
return false, fmt.Errorf("failed to get details: %w", err)
func (s *dsObjectStore) extractDetailsFromItem(it *badger.Item) (*model.ObjectDetails, error) {
key := it.Key()
if v, ok := s.cache.Get(key); ok {
return v.(*model.ObjectDetails), nil
} else {
return exists, nil
return s.unmarshalDetailsFromItem(it)
}
}
func (s *dsObjectStore) unmarshalDetailsFromItem(it *badger.Item) (*model.ObjectDetails, error) {
var details *model.ObjectDetails
err := it.Value(func(val []byte) error {
var err error
details, err = unmarshalDetails(detailsKeyToID(it.Key()), val)
if err != nil {
return fmt.Errorf("unmarshal details: %w", err)
}
s.cache.Set(it.Key(), details, int64(details.Size()))
return nil
})
if err != nil {
return nil, fmt.Errorf("get item value: %w", err)
}
return details, nil
}
func unmarshalDetails(id string, rawValue []byte) (*model.ObjectDetails, error) {
result := &model.ObjectDetails{}
if err := proto.Unmarshal(rawValue, result); err != nil {
return nil, err
}
if result.Details == nil {
result.Details = &types.Struct{Fields: map[string]*types.Value{}}
}
if result.Details.Fields == nil {
result.Details.Fields = map[string]*types.Value{}
} else {
pbtypes.StructDeleteEmptyFields(result.Details)
}
result.Details.Fields[database.RecordIDField] = pbtypes.ToValue(id)
return result, nil
}
func detailsKeyToID(key []byte) string {
return path.Base(string(key))
}
type order struct {
filter.Order
}
func (o order) Compare(lhs, rhs interface{}) (comp int) {
le := lhs.(database.Record)
re := rhs.(database.Record)
if o.Order != nil {
comp = o.Order.Compare(le, re)
}
// when order isn't set or equal - sort by id
if comp == 0 {
if pbtypes.GetString(le.Details, "id") > pbtypes.GetString(re.Details, "id") {
return 1
} else {
return -1
}
}
return comp
}
func (o order) CalcScore(key interface{}) float64 {
return 0
}
func (s *dsObjectStore) getObjectInfo(txn *badger.Txn, id string) (*model.ObjectInfo, error) {
sbt, err := s.sbtProvider.Type(id)
if err != nil {
@ -638,34 +631,6 @@ func listIDsByPrefix(txn *badger.Txn, prefix []byte) ([]string, error) {
return ids, err
}
func (s *dsObjectStore) removeByPrefixInTx(txn *badger.Txn, prefix string) (*badger.Txn, int, error) {
var toDelete [][]byte
err := iterateKeysByPrefixTx(txn, []byte(prefix), func(key []byte) {
toDelete = append(toDelete, key)
})
if err != nil {
return txn, 0, fmt.Errorf("iterate keys: %w", err)
}
var removed int
for _, key := range toDelete {
err = txn.Delete(key)
if err == badger.ErrTxnTooBig {
err = txn.Commit()
if err != nil {
return txn, removed, fmt.Errorf("commit big transaction: %w", err)
}
txn = s.db.NewTransaction(true)
err = txn.Delete(key)
}
if err != nil {
return txn, removed, fmt.Errorf("delete key %s: %w", key, err)
}
removed++
}
return txn, removed, nil
}
func pageLinkKeys(id string, out []string) []ds.Key {
keys := make([]ds.Key, 0, 2*len(out))
// links outgoing from specified node id

View file

@ -4,6 +4,8 @@ import (
"fmt"
"github.com/dgraph-io/badger/v3"
"github.com/huandu/skiplist"
"github.com/samber/lo"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
@ -13,6 +15,85 @@ import (
"github.com/anyproto/anytype-heart/util/pbtypes"
)
func (s *dsObjectStore) Query(sch schema.Schema, q database.Query) ([]database.Record, int, error) {
filters, err := s.buildQuery(sch, q)
if err != nil {
return nil, 0, fmt.Errorf("build query: %w", err)
}
recs, err := s.QueryRaw(filters, q.Limit, q.Offset)
return recs, 0, err
}
func (s *dsObjectStore) QueryRaw(filters *database.Filters, limit int, offset int) ([]database.Record, error) {
if filters == nil || filters.FilterObj == nil {
return nil, fmt.Errorf("filter cannot be nil or unitialized")
}
skl := skiplist.New(order{filters.Order})
err := s.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = pagesDetailsBase.Bytes()
iterator := txn.NewIterator(opts)
defer iterator.Close()
for iterator.Rewind(); iterator.Valid(); iterator.Next() {
it := iterator.Item()
details, err := s.extractDetailsFromItem(it)
if err != nil {
return err
}
rec := database.Record{Details: details.Details}
if filters.FilterObj != nil && filters.FilterObj.FilterObject(rec) {
if offset > 0 {
offset--
continue
}
if limit > 0 && skl.Len() >= limit {
break
}
skl.Set(rec, nil)
}
}
return nil
})
if err != nil {
return nil, err
}
records := make([]database.Record, 0, skl.Len())
for it := skl.Front(); it != nil; it = it.Next() {
records = append(records, it.Key().(database.Record))
}
return records, nil
}
func (s *dsObjectStore) QueryById(ids []string) (records []database.Record, err error) {
err = s.db.View(func(txn *badger.Txn) error {
iterator := txn.NewIterator(badger.DefaultIteratorOptions)
defer iterator.Close()
for iterator.Rewind(); iterator.Valid(); iterator.Next() {
it := iterator.Item()
details, err := s.extractDetailsFromItem(it)
if err != nil {
return err
}
if lo.Contains(ids, pbtypes.GetString(details.Details, bundle.RelationKeyId.String())) {
rec := database.Record{Details: details.Details}
records = append(records, rec)
}
}
return nil
})
if err != nil {
return nil, err
}
return records, nil
}
func (s *dsObjectStore) buildQuery(sch schema.Schema, q database.Query) (*database.Filters, error) {
filters, err := database.NewFilters(q, sch, s)
if err != nil {

View file

@ -8,16 +8,55 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/anyproto/anytype-heart/metrics"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/database"
"github.com/anyproto/anytype-heart/pkg/lib/datastore/noctxds"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/util/debug"
"github.com/anyproto/anytype-heart/util/pbtypes"
"github.com/anyproto/anytype-heart/util/slice"
)
func (s *dsObjectStore) UpdateObjectDetails(id string, details *types.Struct) error {
if details == nil {
return nil
}
if details.Fields == nil {
return fmt.Errorf("details fields are nil")
}
key := pagesDetailsBase.ChildString(id).Bytes()
return s.updateTxn(func(txn *badger.Txn) error {
prev, ok := s.cache.Get(key)
if !ok {
it, err := txn.Get(key)
if err != nil && err != badger.ErrKeyNotFound {
return fmt.Errorf("get item: %w", err)
}
if err != badger.ErrKeyNotFound {
prev, err = s.unmarshalDetailsFromItem(it)
if err != nil {
return fmt.Errorf("extract details: %w", err)
}
}
}
detailsModel := &model.ObjectDetails{
Details: details,
}
if prev != nil && proto.Equal(prev.(*model.ObjectDetails), detailsModel) {
return ErrDetailsNotChanged
}
// Ensure ID is set
details.Fields[bundle.RelationKeyId.String()] = pbtypes.String(id)
s.sendUpdatesToSubscriptions(id, details)
s.cache.Set(key, detailsModel, int64(detailsModel.Size()))
val, err := proto.Marshal(detailsModel)
if err != nil {
return fmt.Errorf("marshal details: %w", err)
}
return txn.Set(key, val)
})
}
func (s *dsObjectStore) UpdateObjectLinks(id string, links []string) error {
return s.updateTxn(func(txn *badger.Txn) error {
return s.updateObjectLinks(txn, id, links)
@ -103,49 +142,6 @@ func (s *dsObjectStore) updateObjectLinks(txn *badger.Txn, id string, links []st
return nil
}
func (s *dsObjectStore) updateObjectDetails(txn noctxds.Txn, id string, before model.ObjectInfo, details *types.Struct) error {
if details != nil {
if err := s.updateDetails(txn, id, &model.ObjectDetails{Details: before.Details}, &model.ObjectDetails{Details: details}); err != nil {
return err
}
}
return nil
}
func (s *dsObjectStore) updateDetails(txn noctxds.Txn, id string, oldDetails *model.ObjectDetails, newDetails *model.ObjectDetails) error {
metrics.ObjectDetailsUpdatedCounter.Inc()
detailsKey := pagesDetailsBase.ChildString(id)
if newDetails.GetDetails().GetFields() == nil {
return fmt.Errorf("newDetails is nil")
}
newDetails.Details.Fields[bundle.RelationKeyId.String()] = pbtypes.String(id) // always ensure we have id set
b, err := proto.Marshal(newDetails)
if err != nil {
return err
}
err = txn.Put(detailsKey, b)
if err != nil {
return err
}
if pbtypes.GetString(newDetails.Details, bundle.RelationKeyWorkspaceId.String()) == "" {
log.With("objectID", id).With("stack", debug.StackCompact(false)).Warnf("workspaceId erased")
}
if oldDetails.GetDetails().Equal(newDetails.GetDetails()) {
return ErrDetailsNotChanged
}
if newDetails != nil && newDetails.Details.Fields != nil {
s.sendUpdatesToSubscriptions(id, newDetails.Details)
}
return nil
}
// should be called under the mutex
func (s *dsObjectStore) sendUpdatesToSubscriptions(id string, details *types.Struct) {
detCopy := pbtypes.CopyStruct(details)