mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-09 17:44:59 +09:00
GO-1483: Fix UpdateLocalPendingDetails
This commit is contained in:
parent
cf0820361e
commit
7723c69020
6 changed files with 70 additions and 111 deletions
|
@ -57,7 +57,8 @@ func initObjecStore(o *options) (os objectstore.ObjectStore, closer func(), err
|
|||
}
|
||||
|
||||
ds2 := noctxds.New(ds)
|
||||
return objectstore.NewWithLocalstore(ds2), closer, nil
|
||||
_ = ds2
|
||||
return objectstore.NewWithLocalstore(), closer, nil
|
||||
}
|
||||
|
||||
func initBadgerV3(o *options) (*dsbadgerv3.Datastore, error) {
|
||||
|
|
|
@ -1,13 +1,33 @@
|
|||
package objectstore
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
)
|
||||
|
||||
func (s *dsObjectStore) updateTxn(f func(txn *badger.Txn) error) error {
|
||||
for {
|
||||
err := s.db.Update(f)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if errors.Is(err, badger.ErrConflict) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func setValue(db *badger.DB, key []byte, value any) error {
|
||||
return db.Update(func(txn *badger.Txn) error {
|
||||
return setValueTxn(txn, key, value)
|
||||
})
|
||||
}
|
||||
|
||||
func setValueTxn(txn *badger.Txn, key []byte, value any) error {
|
||||
var (
|
||||
raw []byte
|
||||
)
|
||||
|
@ -25,10 +45,7 @@ func setValue(db *badger.DB, key []byte, value any) error {
|
|||
return fmt.Errorf("marshal value: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return db.Update(func(txn *badger.Txn) error {
|
||||
return txn.Set(key, raw)
|
||||
})
|
||||
return txn.Set(key, raw)
|
||||
}
|
||||
|
||||
func deleteValue(db *badger.DB, key []byte) error {
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"path"
|
||||
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
"github.com/dgraph-io/ristretto"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/gogo/protobuf/types"
|
||||
"github.com/huandu/skiplist"
|
||||
|
@ -19,30 +18,6 @@ import (
|
|||
"github.com/anyproto/anytype-heart/util/pbtypes"
|
||||
)
|
||||
|
||||
type newstore struct {
|
||||
cache *ristretto.Cache
|
||||
db *badger.DB
|
||||
onUpdate func(*types.Struct)
|
||||
}
|
||||
|
||||
func newNewstore(path string) (*newstore, error) {
|
||||
cache, err := ristretto.NewCache(&ristretto.Config{
|
||||
NumCounters: 10_000_000,
|
||||
MaxCost: 100_000_000,
|
||||
BufferItems: 64,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create cache: %w", err)
|
||||
}
|
||||
|
||||
db, err := badger.Open(badger.DefaultOptions(path))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open badgerdb: %w", err)
|
||||
}
|
||||
|
||||
return &newstore{cache: cache, db: db}, nil
|
||||
}
|
||||
|
||||
func (s *dsObjectStore) UpdateObjectDetails(id string, details *types.Struct) error {
|
||||
if details == nil {
|
||||
return nil
|
||||
|
|
|
@ -66,10 +66,8 @@ func New(sbtProvider typeprovider.SmartBlockTypeProvider) ObjectStore {
|
|||
}
|
||||
}
|
||||
|
||||
func NewWithLocalstore(ds noctxds.DSTxnBatching) ObjectStore {
|
||||
return &dsObjectStore{
|
||||
ds: ds,
|
||||
}
|
||||
func NewWithLocalstore() ObjectStore {
|
||||
return &dsObjectStore{}
|
||||
}
|
||||
|
||||
type SourceDetailsFromId interface {
|
||||
|
@ -77,7 +75,6 @@ type SourceDetailsFromId interface {
|
|||
}
|
||||
|
||||
func (s *dsObjectStore) Init(a *app.App) (err error) {
|
||||
s.dsIface = a.MustComponent(datastore.CName).(datastore.Datastore)
|
||||
src := a.Component("source")
|
||||
if src != nil {
|
||||
s.sourceService = a.MustComponent("source").(SourceDetailsFromId)
|
||||
|
@ -88,7 +85,8 @@ func (s *dsObjectStore) Init(a *app.App) (err error) {
|
|||
} else {
|
||||
s.fts = fts.(ftsearch.FTSearch)
|
||||
}
|
||||
s.db, err = s.dsIface.LocalstoreBadger()
|
||||
datastoreService := a.MustComponent(datastore.CName).(datastore.Datastore)
|
||||
s.db, err = datastoreService.LocalstoreBadger()
|
||||
if err != nil {
|
||||
return fmt.Errorf("get badger: %w", err)
|
||||
}
|
||||
|
@ -178,8 +176,6 @@ var ErrNotAnObject = fmt.Errorf("not an object")
|
|||
|
||||
type dsObjectStore struct {
|
||||
// underlying storage
|
||||
ds noctxds.DSTxnBatching
|
||||
dsIface datastore.Datastore
|
||||
sourceService SourceDetailsFromId
|
||||
|
||||
cache *ristretto.Cache
|
||||
|
@ -224,9 +220,7 @@ func (s *dsObjectStore) eraseLinks() (outboundRemoved int, inboundRemoved int, e
|
|||
}
|
||||
|
||||
func (s *dsObjectStore) Run(context.Context) (err error) {
|
||||
lds, err := s.dsIface.LocalstoreDS()
|
||||
s.ds = noctxds.New(lds)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *dsObjectStore) Close(_ context.Context) (err error) {
|
||||
|
|
|
@ -13,14 +13,12 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
dsbadgerv3 "github.com/textileio/go-ds-badger3"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/wallet"
|
||||
"github.com/anyproto/anytype-heart/core/wallet/mock_wallet"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/database"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/datastore/noctxds"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/ftsearch"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
|
||||
"github.com/anyproto/anytype-heart/space/typeprovider/mock_typeprovider"
|
||||
|
@ -32,11 +30,6 @@ type storeFixture struct {
|
|||
}
|
||||
|
||||
func newStoreFixture(t *testing.T) *storeFixture {
|
||||
ds, err := dsbadgerv3.NewDatastore(t.TempDir(), &dsbadgerv3.DefaultOptions)
|
||||
require.NoError(t, err)
|
||||
|
||||
noCtxDS := noctxds.New(ds)
|
||||
|
||||
typeProvider := mock_typeprovider.NewMockSmartBlockTypeProvider(t)
|
||||
typeProvider.EXPECT().Type(mock.Anything).Return(smartblock.SmartBlockTypePage, nil).Maybe()
|
||||
|
||||
|
@ -47,7 +40,7 @@ func newStoreFixture(t *testing.T) *storeFixture {
|
|||
fullText := ftsearch.New()
|
||||
testApp := &app.App{}
|
||||
testApp.Register(walletService)
|
||||
err = fullText.Init(testApp)
|
||||
err := fullText.Init(testApp)
|
||||
require.NoError(t, err)
|
||||
err = fullText.Run(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
@ -57,7 +50,6 @@ func newStoreFixture(t *testing.T) *storeFixture {
|
|||
|
||||
return &storeFixture{
|
||||
dsObjectStore: &dsObjectStore{
|
||||
ds: noCtxDS,
|
||||
sbtProvider: typeProvider,
|
||||
fts: fullText,
|
||||
db: db,
|
||||
|
|
|
@ -4,17 +4,16 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/anyproto/anytype-heart/util/debug"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/gogo/protobuf/types"
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
|
||||
"github.com/anyproto/anytype-heart/metrics"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/database"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/datastore/noctxds"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
|
||||
"github.com/anyproto/anytype-heart/util/debug"
|
||||
"github.com/anyproto/anytype-heart/util/pbtypes"
|
||||
"github.com/anyproto/anytype-heart/util/slice"
|
||||
)
|
||||
|
@ -70,70 +69,51 @@ func (s *dsObjectStore) UpdateObjectSnippet(id string, snippet string) error {
|
|||
}
|
||||
|
||||
func (s *dsObjectStore) UpdatePendingLocalDetails(id string, proc func(details *types.Struct) (*types.Struct, error)) error {
|
||||
// todo: review this method. Any other way to do this?
|
||||
for {
|
||||
err := s.updatePendingLocalDetails(id, proc)
|
||||
if errors.Is(err, badger.ErrConflict) {
|
||||
continue
|
||||
return s.updateTxn(func(txn *badger.Txn) error {
|
||||
key := pendingDetailsBase.ChildString(id).Bytes()
|
||||
|
||||
objDetails, err := s.getPendingLocalDetails(txn, key)
|
||||
if err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
|
||||
return fmt.Errorf("get pending details: %w", err)
|
||||
}
|
||||
|
||||
oldDetails := objDetails.GetDetails()
|
||||
if oldDetails == nil {
|
||||
oldDetails = &types.Struct{Fields: map[string]*types.Value{}}
|
||||
}
|
||||
if oldDetails.Fields == nil {
|
||||
oldDetails.Fields = map[string]*types.Value{}
|
||||
}
|
||||
newDetails, err := proc(oldDetails)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("run a modifier: %w", err)
|
||||
}
|
||||
if newDetails == nil {
|
||||
err = txn.Delete(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if newDetails.Fields == nil {
|
||||
newDetails.Fields = map[string]*types.Value{}
|
||||
}
|
||||
newDetails.Fields[bundle.RelationKeyId.String()] = pbtypes.String(id)
|
||||
err = setValueTxn(txn, key, &model.ObjectDetails{Details: newDetails})
|
||||
if err != nil {
|
||||
return fmt.Errorf("put pending details: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *dsObjectStore) updatePendingLocalDetails(id string, proc func(details *types.Struct) (*types.Struct, error)) error {
|
||||
txn, err := s.ds.NewTransaction(false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating txn in datastore: %w", err)
|
||||
}
|
||||
defer txn.Discard()
|
||||
key := pendingDetailsBase.ChildString(id)
|
||||
|
||||
objDetails, err := s.getPendingLocalDetails(txn, id)
|
||||
if err != nil && err != ds.ErrNotFound {
|
||||
return fmt.Errorf("get pending details: %w", err)
|
||||
}
|
||||
|
||||
details := objDetails.GetDetails()
|
||||
if details == nil {
|
||||
details = &types.Struct{Fields: map[string]*types.Value{}}
|
||||
}
|
||||
if details.Fields == nil {
|
||||
details.Fields = map[string]*types.Value{}
|
||||
}
|
||||
details, err = proc(details)
|
||||
if err != nil {
|
||||
return fmt.Errorf("run a modifier: %w", err)
|
||||
}
|
||||
if details == nil {
|
||||
err = txn.Delete(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return txn.Commit()
|
||||
}
|
||||
b, err := proto.Marshal(&model.ObjectDetails{Details: details})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = txn.Put(key, b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return txn.Commit()
|
||||
}
|
||||
|
||||
func (s *dsObjectStore) getPendingLocalDetails(txn noctxds.Txn, id string) (*model.ObjectDetails, error) {
|
||||
return nil, nil
|
||||
// TODO Fix
|
||||
// val, err := txn.Get(pendingDetailsBase.ChildString(id))
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return unmarshalDetails(id, val)
|
||||
func (s *dsObjectStore) getPendingLocalDetails(txn *badger.Txn, key []byte) (*model.ObjectDetails, error) {
|
||||
return getValueTxn(txn, key, func(raw []byte) (*model.ObjectDetails, error) {
|
||||
var res model.ObjectDetails
|
||||
err := proto.Unmarshal(raw, &res)
|
||||
return &res, err
|
||||
})
|
||||
}
|
||||
|
||||
func (s *dsObjectStore) updateObjectLinks(txn *badger.Txn, id string, links []string) error {
|
||||
|
@ -194,7 +174,7 @@ func (s *dsObjectStore) updateDetails(txn noctxds.Txn, id string, oldDetails *mo
|
|||
if pbtypes.GetString(newDetails.Details, bundle.RelationKeyWorkspaceId.String()) == "" {
|
||||
log.With("objectID", id).With("stack", debug.StackCompact(false)).Warnf("workspaceId erased")
|
||||
}
|
||||
|
||||
|
||||
if oldDetails.GetDetails().Equal(newDetails.GetDetails()) {
|
||||
return ErrDetailsNotChanged
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue