1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-09 09:35:00 +09:00

GO-5418: WIP: refactor kv service

This commit is contained in:
Sergey 2025-04-23 15:11:42 +02:00
parent 0b034d37f0
commit c4f7bbfc6f
No known key found for this signature in database
GPG key ID: 3B6BEF79160221C6
11 changed files with 110 additions and 124 deletions

View file

@ -0,0 +1,228 @@
package keyvalueservice
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"sync"
"github.com/anyproto/any-sync/commonspace"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage/innerstorage"
"go.uber.org/zap"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver"
)
const CName = "core.keyvalueservice"
var log = logging.Logger(CName).Desugar()
type ObserverFunc func(key string, val Value)
type Value struct {
Data []byte
TimestampMilli int
}
type subscription struct {
name string
observerFunc ObserverFunc
}
type derivedKey string
type Service interface {
Get(ctx context.Context, key string) ([]Value, error)
Set(ctx context.Context, key string, value []byte) error
SubscribeForKey(key string, subscriptionName string, observerFunc ObserverFunc) error
UnsubscribeFromKey(key string, subscriptionName string) error
}
type service struct {
lock sync.RWMutex
subscriptions map[derivedKey]map[string]subscription
keyValueStore keyvaluestorage.Storage
spaceCore commonspace.Space
observer keyvalueobserver.Observer
keysLock sync.Mutex
spaceSalt []byte
keyToDerivedKey map[string]derivedKey
derivedKeyToKey map[derivedKey]string
}
func New(spaceCore commonspace.Space, observer keyvalueobserver.Observer) (Service, error) {
s := &service{
spaceCore: spaceCore,
observer: observer,
keyValueStore: spaceCore.KeyValue().DefaultStore(),
subscriptions: make(map[derivedKey]map[string]subscription),
keyToDerivedKey: make(map[string]derivedKey),
derivedKeyToKey: make(map[derivedKey]string),
}
err := s.initSpaceSalt()
if err != nil {
return nil, fmt.Errorf("init tech salt: %w", err)
}
s.observer.SetObserver(s.observeChanges)
return s, nil
}
func (s *service) initSpaceSalt() error {
records := s.spaceCore.Acl().Records()
if len(records) == 0 {
return fmt.Errorf("empty acl")
}
first := records[0]
readKeyId, err := s.spaceCore.Acl().AclState().ReadKeyForAclId(first.Id)
if err != nil {
return fmt.Errorf("find read key id: %w", err)
}
readKeys := s.spaceCore.Acl().AclState().Keys()
key, ok := readKeys[readKeyId]
if !ok {
return fmt.Errorf("read key not found")
}
rawReadKey, err := key.ReadKey.Raw()
if err != nil {
return fmt.Errorf("get raw bytes: %w", err)
}
s.spaceSalt = rawReadKey
return nil
}
func (s *service) observeChanges(decryptFunc keyvaluestorage.Decryptor, kvs []innerstorage.KeyValue) {
for _, kv := range kvs {
s.lock.RLock()
byKey := s.subscriptions[derivedKey(kv.Key)]
for _, sub := range byKey {
data, err := decryptFunc(kv)
if err != nil {
log.Error("can't decrypt value", zap.Error(err))
continue
}
key, ok := s.getKeyFromDerived(derivedKey(kv.Key))
if !ok {
log.Error("can't get key from derived key", zap.String("subName", sub.name))
continue
}
sub.observerFunc(key, Value{Data: data, TimestampMilli: kv.TimestampMilli})
}
s.lock.RUnlock()
}
}
func (s *service) Get(ctx context.Context, key string) ([]Value, error) {
derived, err := s.getDerivedKey(key)
if err != nil {
return nil, fmt.Errorf("getDerivedKey: %w", err)
}
var result []Value
err = s.keyValueStore.GetAll(ctx, string(derived), func(decryptor keyvaluestorage.Decryptor, kvs []innerstorage.KeyValue) error {
result = make([]Value, 0, len(kvs))
for _, kv := range kvs {
data, err := decryptor(kv)
if err != nil {
return fmt.Errorf("decrypt: %w", err)
}
result = append(result, Value{
Data: data,
TimestampMilli: kv.TimestampMilli,
})
}
return nil
})
if err != nil {
return nil, fmt.Errorf("get all: %w", err)
}
return result, nil
}
func (s *service) Set(ctx context.Context, key string, value []byte) error {
derived, err := s.getDerivedKey(key)
if err != nil {
return fmt.Errorf("getDerivedKey: %w", err)
}
return s.keyValueStore.Set(ctx, string(derived), value)
}
func (s *service) getDerivedKey(key string) (derivedKey, error) {
s.keysLock.Lock()
defer s.keysLock.Unlock()
derived, ok := s.keyToDerivedKey[key]
if ok {
return derived, nil
}
hasher := sha256.New()
// Salt
hasher.Write(s.spaceSalt)
// User key
hasher.Write([]byte(key))
result := hasher.Sum(nil)
derived = derivedKey(hex.EncodeToString(result))
s.keyToDerivedKey[key] = derived
s.derivedKeyToKey[derived] = key
return derived, nil
}
func (s *service) getKeyFromDerived(derived derivedKey) (string, bool) {
s.keysLock.Lock()
defer s.keysLock.Unlock()
key, ok := s.derivedKeyToKey[derived]
return key, ok
}
func (s *service) SubscribeForKey(key string, subscriptionName string, observerFunc ObserverFunc) error {
derived, err := s.getDerivedKey(key)
if err != nil {
return fmt.Errorf("getDerivedKey: %w", err)
}
s.lock.Lock()
defer s.lock.Unlock()
byKey, ok := s.subscriptions[derived]
if !ok {
byKey = make(map[string]subscription)
s.subscriptions[derived] = byKey
}
byKey[subscriptionName] = subscription{
name: subscriptionName,
observerFunc: observerFunc,
}
return nil
}
func (s *service) UnsubscribeFromKey(key string, subscriptionName string) error {
derived, err := s.getDerivedKey(key)
if err != nil {
return fmt.Errorf("getDerivedKey: %w", err)
}
s.lock.Lock()
defer s.lock.Unlock()
byKey, ok := s.subscriptions[derived]
if ok {
delete(byKey, subscriptionName)
}
return nil
}

View file

@ -24,8 +24,10 @@ import (
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
coresb "github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/pkg/lib/threads"
"github.com/anyproto/anytype-heart/space/clientspace/keyvalueservice"
"github.com/anyproto/anytype-heart/space/internal/objectprovider"
"github.com/anyproto/anytype-heart/space/spacecore"
"github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver"
"github.com/anyproto/anytype-heart/space/spacecore/peermanager"
"github.com/anyproto/anytype-heart/space/spacecore/storage"
"github.com/anyproto/anytype-heart/space/spacecore/storage/anystorage"
@ -56,6 +58,8 @@ type Space interface {
IsPersonal() bool
GetAclIdentity() crypto.PubKey
KeyValueService() keyvalueservice.Service
Close(ctx context.Context) error
}
@ -82,6 +86,7 @@ type space struct {
derivedIDs threads.DerivedSmartblockIds
installer bundledObjectsInstaller
spaceCore spacecore.SpaceCoreService
keyValueService keyvalueservice.Service
personalSpaceId string
aclIdentity crypto.PubKey
@ -102,6 +107,7 @@ type SpaceDeps struct {
AccountService accountservice.Service
StorageService storage.ClientStorage
SpaceCore spacecore.SpaceCoreService
KeyValueObserver keyvalueobserver.Observer
PersonalSpaceId string
LoadCtx context.Context
DisableRemoteLoad bool
@ -150,7 +156,14 @@ func BuildSpace(ctx context.Context, deps SpaceDeps) (Space, error) {
return nil, fmt.Errorf("install bundled objects: %w", err)
}
}
sp.keyValueService, err = keyvalueservice.New(sp.common, deps.KeyValueObserver)
if err != nil {
return nil, fmt.Errorf("create key value service: %w", err)
}
go sp.mandatoryObjectsLoad(deps.LoadCtx, deps.DisableRemoteLoad)
return sp, nil
}
@ -228,6 +241,10 @@ func (s *space) CommonSpace() commonspace.Space {
return s.common
}
func (s *space) KeyValueService() keyvalueservice.Service {
return s.keyValueService
}
func (s *space) WaitMandatoryObjects(ctx context.Context) (err error) {
select {
case <-s.loadMandatoryObjectsCh:

View file

@ -93,15 +93,16 @@ func (b *spaceBuilder) BuildSpace(ctx context.Context, disableRemoteLoad bool) (
coreSpace.TreeSyncer().StopSync()
}
deps := clientspace.SpaceDeps{
Indexer: b.indexer,
Installer: b.installer,
CommonSpace: coreSpace,
ObjectFactory: b.objectFactory,
AccountService: b.accountService,
PersonalSpaceId: b.personalSpaceId,
StorageService: b.storageService,
SpaceCore: b.spaceCore,
LoadCtx: b.ctx,
Indexer: b.indexer,
Installer: b.installer,
CommonSpace: coreSpace,
ObjectFactory: b.objectFactory,
AccountService: b.accountService,
PersonalSpaceId: b.personalSpaceId,
StorageService: b.storageService,
SpaceCore: b.spaceCore,
LoadCtx: b.ctx,
KeyValueObserver: coreSpace.KeyValueObserver(),
}
space, err := clientspace.BuildSpace(ctx, deps)
if err != nil {

View file

@ -16,13 +16,11 @@ const CName = keyvaluestorage.IndexerCName
type ObserverFunc func(decryptor keyvaluestorage.Decryptor, kvs []innerstorage.KeyValue)
type Observer interface {
app.ComponentRunnable
keyvaluestorage.Indexer
SetObserver(observerFunc ObserverFunc)
}
func New() Observer {
return &observer{}
}

View file

@ -130,7 +130,6 @@ func (s *spaceFactory) CreateAndSetTechSpace(ctx context.Context) (*clientspace.
s.techSpace = ts
s.app = s.app.ChildApp()
s.app.Register(s.techSpace)
s.app.Register(kvObserver)
err = ts.Run(techCoreSpace, ts.Cache, kvObserver, true)
if err != nil {
return nil, fmt.Errorf("run tech space: %w", err)
@ -164,7 +163,6 @@ func (s *spaceFactory) LoadAndSetTechSpace(ctx context.Context) (*clientspace.Te
s.techSpace = ts
s.app = s.app.ChildApp()
s.app.Register(s.techSpace)
s.app.Register(kvObserver)
err = ts.Run(techCoreSpace, ts.Cache, kvObserver, false)
if err != nil {
return nil, fmt.Errorf("run tech space: %w", err)

View file

@ -68,9 +68,6 @@ type TechSpace interface {
SpaceViewSetData(ctx context.Context, spaceId string, details *domain.Details) (err error)
SpaceViewId(id string) (string, error)
AccountObjectId() (string, error)
KeyValueObserver() keyvalueobserver.Observer
KeyValueStore() keyvaluestorage.Storage
}
type SpaceView interface {
@ -398,10 +395,6 @@ func (s *techSpace) KeyValueStore() keyvaluestorage.Storage {
return s.techCore.KeyValue().DefaultStore()
}
func (s *techSpace) KeyValueObserver() keyvalueobserver.Observer {
return s.keyvalueObserver
}
func (s *techSpace) Close(ctx context.Context) (err error) {
s.ctxCancel()
s.mu.Lock()