diff --git a/core/anytype/bootstrap.go b/core/anytype/bootstrap.go index 5f89b49a0..9badba6aa 100644 --- a/core/anytype/bootstrap.go +++ b/core/anytype/bootstrap.go @@ -76,7 +76,6 @@ import ( "github.com/anyproto/anytype-heart/core/inviteservice" "github.com/anyproto/anytype-heart/core/invitestore" "github.com/anyproto/anytype-heart/core/kanban" - "github.com/anyproto/anytype-heart/core/keyvalueservice/keyvalueserviceimpl" "github.com/anyproto/anytype-heart/core/nameservice" "github.com/anyproto/anytype-heart/core/notifications" "github.com/anyproto/anytype-heart/core/payments" diff --git a/core/block/source/service.go b/core/block/source/service.go index b5e69c0dd..dbb037344 100644 --- a/core/block/source/service.go +++ b/core/block/source/service.go @@ -18,11 +18,11 @@ import ( "github.com/anyproto/anytype-heart/core/block/object/idderiver" "github.com/anyproto/anytype-heart/core/domain" "github.com/anyproto/anytype-heart/core/files" - "github.com/anyproto/anytype-heart/core/keyvalueservice" "github.com/anyproto/anytype-heart/pkg/lib/bundle" "github.com/anyproto/anytype-heart/pkg/lib/core/smartblock" "github.com/anyproto/anytype-heart/pkg/lib/localstore/addr" "github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore" + "github.com/anyproto/anytype-heart/space/clientspace/keyvalueservice" "github.com/anyproto/anytype-heart/space/spacecore/storage" "github.com/anyproto/anytype-heart/space/spacecore/typeprovider" ) @@ -47,6 +47,7 @@ type Space interface { DeriveObjectID(ctx context.Context, uniqueKey domain.UniqueKey) (id string, err error) StoredIds() []string IsPersonal() bool + KeyValueService() keyvalueservice.Service } type Service interface { @@ -68,7 +69,6 @@ type service struct { objectStore objectstore.ObjectStore fileObjectMigrator fileObjectMigrator idDeriver idderiver.Deriver - keyValueService keyvalueservice.Service mu sync.Mutex staticIds map[string]Source @@ -83,7 +83,6 @@ func (s *service) Init(a *app.App) (err error) { s.storageService = a.MustComponent(spacestorage.CName).(storage.ClientStorage) s.objectStore = app.MustComponent[objectstore.ObjectStore](a) s.idDeriver = app.MustComponent[idderiver.Deriver](a) - s.keyValueService = app.MustComponent[keyvalueservice.Service](a) s.fileService = app.MustComponent[files.Service](a) s.fileObjectMigrator = app.MustComponent[fileObjectMigrator](a) diff --git a/core/block/source/source.go b/core/block/source/source.go index afb4f98a0..2bb7669ec 100644 --- a/core/block/source/source.go +++ b/core/block/source/source.go @@ -193,7 +193,7 @@ func (s *service) newTreeSource(ctx context.Context, space Space, id string, bui fileObjectMigrator: s.fileObjectMigrator, } if sbt == smartblock.SmartBlockTypeChatDerivedObject || sbt == smartblock.SmartBlockTypeAccountObject { - return &store{source: src, sbType: sbt, diffManagers: map[string]*diffManager{}, keyValueService: s.keyValueService}, nil + return &store{source: src, sbType: sbt, diffManagers: map[string]*diffManager{}}, nil } return src, nil diff --git a/core/block/source/store.go b/core/block/source/store.go index ac4f85922..f153c2f4a 100644 --- a/core/block/source/store.go +++ b/core/block/source/store.go @@ -19,11 +19,11 @@ import ( "github.com/anyproto/anytype-heart/core/block/editor/state" "github.com/anyproto/anytype-heart/core/block/editor/storestate" "github.com/anyproto/anytype-heart/core/domain" - "github.com/anyproto/anytype-heart/core/keyvalueservice" "github.com/anyproto/anytype-heart/pb" "github.com/anyproto/anytype-heart/pkg/lib/bundle" "github.com/anyproto/anytype-heart/pkg/lib/core/smartblock" "github.com/anyproto/anytype-heart/pkg/lib/pb/model" + "github.com/anyproto/anytype-heart/space/clientspace/keyvalueservice" ) type PushChangeHook func(params PushChangeParams) (id string, err error) @@ -62,11 +62,11 @@ var ( type store struct { *source - keyValueService keyvalueservice.Service - store *storestate.StoreState - onUpdateHook func() - onPushChange PushChangeHook - sbType smartblock.SmartBlockType + techSpace Space + store *storestate.StoreState + onUpdateHook func() + onPushChange PushChangeHook + sbType smartblock.SmartBlockType diffManagers map[string]*diffManager } @@ -99,7 +99,7 @@ func (s *store) initDiffManagers(ctx context.Context) error { return fmt.Errorf("init diff manager: %w", err) } - vals, err := s.keyValueService.GetUserScopedKey(ctx, s.seenHeadsKey(name)) + vals, err := s.techSpace.KeyValueService().Get(ctx, s.seenHeadsKey(name)) if err != nil { return fmt.Errorf("get value: %w", err) } @@ -148,7 +148,7 @@ func (s *store) InitDiffManager(ctx context.Context, name string, seenHeads []st return fmt.Errorf("init diff manager: %w", err) } - err = s.keyValueService.SubscribeForUserScopedKey(s.seenHeadsKey(name), name, func(key string, val keyvalueservice.Value) { + err = s.techSpace.KeyValueService().SubscribeForKey(s.seenHeadsKey(name), name, func(key string, val keyvalueservice.Value) { s.ObjectTree.Lock() defer s.ObjectTree.Unlock() @@ -287,18 +287,22 @@ func (s *store) PushStoreChange(ctx context.Context, params PushStoreChangeParam return "", err } - for _, m := range s.diffManagers { - if m.diffManager != nil { - m.diffManager.Add(&objecttree.Change{ - Id: changeId, - PreviousIds: ch.PreviousIds, - }) - } - } + s.addToDiffManagers(&objecttree.Change{ + Id: changeId, + PreviousIds: ch.PreviousIds, + }) return changeId, err } +func (s *store) addToDiffManagers(change *objecttree.Change) { + for _, m := range s.diffManagers { + if m.diffManager != nil { + m.diffManager.Add(change) + } + } +} + func (s *store) update(ctx context.Context, tree objecttree.ObjectTree) error { tx, err := s.store.NewTx(ctx) if err != nil { @@ -313,15 +317,20 @@ func (s *store) update(ctx context.Context, tree objecttree.ObjectTree) error { return errors.Join(tx.Rollback(), err) } err = tx.Commit() + + s.updateInDiffManagers(tree) + if err == nil { + s.onUpdateHook() + } + return err +} + +func (s *store) updateInDiffManagers(tree objecttree.ObjectTree) { for _, m := range s.diffManagers { if m.diffManager != nil { m.diffManager.Update(tree) } } - if err == nil { - s.onUpdateHook() - } - return err } func (s *store) MarkSeenHeads(ctx context.Context, name string, heads []string) error { @@ -345,7 +354,7 @@ func (s *store) StoreSeenHeads(ctx context.Context, name string) error { return fmt.Errorf("marshal seen heads: %w", err) } - return s.keyValueService.SetUserScopedKey(ctx, s.seenHeadsKey(name), raw) + return s.techSpace.KeyValueService().Set(ctx, s.seenHeadsKey(name), raw) } func (s *store) seenHeadsKey(diffManagerName string) string { diff --git a/core/keyvalueservice/interfaces.go b/core/keyvalueservice/interfaces.go deleted file mode 100644 index e167d2f6c..000000000 --- a/core/keyvalueservice/interfaces.go +++ /dev/null @@ -1,23 +0,0 @@ -package keyvalueservice - -import ( - "context" - - "github.com/anyproto/any-sync/app" -) - -type ObserverFunc func(key string, val Value) - -type Value struct { - Data []byte - TimestampMilli int -} - -type Service interface { - app.ComponentRunnable - - GetUserScopedKey(ctx context.Context, key string) ([]Value, error) - SetUserScopedKey(ctx context.Context, key string, value []byte) error - SubscribeForUserScopedKey(key string, subscriptionName string, observerFunc ObserverFunc) error - UnsubscribeFromUserScopedKey(key string, subscriptionName string) error -} diff --git a/core/keyvalueservice/keyvalueserviceimpl/service.go b/space/clientspace/keyvalueservice/service.go similarity index 61% rename from core/keyvalueservice/keyvalueserviceimpl/service.go rename to space/clientspace/keyvalueservice/service.go index 157480842..9304daf91 100644 --- a/core/keyvalueservice/keyvalueserviceimpl/service.go +++ b/space/clientspace/keyvalueservice/service.go @@ -1,4 +1,4 @@ -package keyvalueserviceimpl +package keyvalueservice import ( "context" @@ -7,86 +7,85 @@ import ( "fmt" "sync" - "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace" "github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage" "github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage/innerstorage" "go.uber.org/zap" - "github.com/anyproto/anytype-heart/core/keyvalueservice" "github.com/anyproto/anytype-heart/pkg/lib/logging" - "github.com/anyproto/anytype-heart/space" - "github.com/anyproto/anytype-heart/space/clientspace" + "github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver" ) const CName = "core.keyvalueservice" var log = logging.Logger(CName).Desugar() +type ObserverFunc func(key string, val Value) + +type Value struct { + Data []byte + TimestampMilli int +} + type subscription struct { name string - observerFunc keyvalueservice.ObserverFunc + observerFunc ObserverFunc } type derivedKey string +type Service interface { + Get(ctx context.Context, key string) ([]Value, error) + Set(ctx context.Context, key string, value []byte) error + SubscribeForKey(key string, subscriptionName string, observerFunc ObserverFunc) error + UnsubscribeFromKey(key string, subscriptionName string) error +} + type service struct { lock sync.RWMutex subscriptions map[derivedKey]map[string]subscription - spaceService space.Service - techSpace *clientspace.TechSpace - - techSpaceSalt []byte + keyValueStore keyvaluestorage.Storage + spaceCore commonspace.Space + observer keyvalueobserver.Observer keysLock sync.Mutex + spaceSalt []byte keyToDerivedKey map[string]derivedKey derivedKeyToKey map[derivedKey]string } -func New() keyvalueservice.Service { - return &service{ +func New(spaceCore commonspace.Space, observer keyvalueobserver.Observer) (Service, error) { + s := &service{ + spaceCore: spaceCore, + observer: observer, + keyValueStore: spaceCore.KeyValue().DefaultStore(), subscriptions: make(map[derivedKey]map[string]subscription), keyToDerivedKey: make(map[string]derivedKey), derivedKeyToKey: make(map[derivedKey]string), } -} - -func (s *service) Init(a *app.App) (err error) { - s.spaceService = app.MustComponent[space.Service](a) - return nil -} - -func (s *service) Name() (name string) { - return CName -} - -func (s *service) Run(ctx context.Context) error { - s.techSpace = s.spaceService.TechSpace() - - err := s.initTechSpaceSalt() + err := s.initSpaceSalt() if err != nil { - return fmt.Errorf("init tech salt: %w", err) + return nil, fmt.Errorf("init tech salt: %w", err) } - s.techSpace.KeyValueObserver().SetObserver(s.observeChanges) - - return nil + s.observer.SetObserver(s.observeChanges) + return s, nil } -func (s *service) initTechSpaceSalt() error { - commonSpace := s.techSpace.CommonSpace() - records := commonSpace.Acl().Records() +func (s *service) initSpaceSalt() error { + records := s.spaceCore.Acl().Records() if len(records) == 0 { return fmt.Errorf("empty acl") } first := records[0] - readKeyId, err := commonSpace.Acl().AclState().ReadKeyForAclId(first.Id) + readKeyId, err := s.spaceCore.Acl().AclState().ReadKeyForAclId(first.Id) if err != nil { return fmt.Errorf("find read key id: %w", err) } - readKeys := commonSpace.Acl().AclState().Keys() + readKeys := s.spaceCore.Acl().AclState().Keys() key, ok := readKeys[readKeyId] if !ok { return fmt.Errorf("read key not found") @@ -97,7 +96,7 @@ func (s *service) initTechSpaceSalt() error { return fmt.Errorf("get raw bytes: %w", err) } - s.techSpaceSalt = rawReadKey + s.spaceSalt = rawReadKey return nil } @@ -118,31 +117,27 @@ func (s *service) observeChanges(decryptFunc keyvaluestorage.Decryptor, kvs []in continue } - sub.observerFunc(key, keyvalueservice.Value{Data: data, TimestampMilli: kv.TimestampMilli}) + sub.observerFunc(key, Value{Data: data, TimestampMilli: kv.TimestampMilli}) } s.lock.RUnlock() } } -func (s *service) Close(ctx context.Context) (err error) { - return nil -} - -func (s *service) GetUserScopedKey(ctx context.Context, key string) ([]keyvalueservice.Value, error) { +func (s *service) Get(ctx context.Context, key string) ([]Value, error) { derived, err := s.getDerivedKey(key) if err != nil { return nil, fmt.Errorf("getDerivedKey: %w", err) } - var result []keyvalueservice.Value - err = s.techSpace.KeyValueStore().GetAll(ctx, string(derived), func(decryptor keyvaluestorage.Decryptor, kvs []innerstorage.KeyValue) error { - result = make([]keyvalueservice.Value, 0, len(kvs)) + var result []Value + err = s.keyValueStore.GetAll(ctx, string(derived), func(decryptor keyvaluestorage.Decryptor, kvs []innerstorage.KeyValue) error { + result = make([]Value, 0, len(kvs)) for _, kv := range kvs { data, err := decryptor(kv) if err != nil { return fmt.Errorf("decrypt: %w", err) } - result = append(result, keyvalueservice.Value{ + result = append(result, Value{ Data: data, TimestampMilli: kv.TimestampMilli, }) @@ -155,12 +150,12 @@ func (s *service) GetUserScopedKey(ctx context.Context, key string) ([]keyvalues return result, nil } -func (s *service) SetUserScopedKey(ctx context.Context, key string, value []byte) error { +func (s *service) Set(ctx context.Context, key string, value []byte) error { derived, err := s.getDerivedKey(key) if err != nil { return fmt.Errorf("getDerivedKey: %w", err) } - return s.techSpace.KeyValueStore().Set(ctx, string(derived), value) + return s.keyValueStore.Set(ctx, string(derived), value) } func (s *service) getDerivedKey(key string) (derivedKey, error) { @@ -174,7 +169,7 @@ func (s *service) getDerivedKey(key string) (derivedKey, error) { hasher := sha256.New() // Salt - hasher.Write(s.techSpaceSalt) + hasher.Write(s.spaceSalt) // User key hasher.Write([]byte(key)) result := hasher.Sum(nil) @@ -194,7 +189,7 @@ func (s *service) getKeyFromDerived(derived derivedKey) (string, bool) { return key, ok } -func (s *service) SubscribeForUserScopedKey(key string, name string, observerFunc keyvalueservice.ObserverFunc) error { +func (s *service) SubscribeForKey(key string, subscriptionName string, observerFunc ObserverFunc) error { derived, err := s.getDerivedKey(key) if err != nil { return fmt.Errorf("getDerivedKey: %w", err) @@ -209,14 +204,14 @@ func (s *service) SubscribeForUserScopedKey(key string, name string, observerFun s.subscriptions[derived] = byKey } - byKey[name] = subscription{ - name: name, + byKey[subscriptionName] = subscription{ + name: subscriptionName, observerFunc: observerFunc, } return nil } -func (s *service) UnsubscribeFromUserScopedKey(key string, name string) error { +func (s *service) UnsubscribeFromKey(key string, subscriptionName string) error { derived, err := s.getDerivedKey(key) if err != nil { return fmt.Errorf("getDerivedKey: %w", err) @@ -227,7 +222,7 @@ func (s *service) UnsubscribeFromUserScopedKey(key string, name string) error { byKey, ok := s.subscriptions[derived] if ok { - delete(byKey, name) + delete(byKey, subscriptionName) } return nil } diff --git a/space/clientspace/space.go b/space/clientspace/space.go index 4de7d9659..6bc60aa0a 100644 --- a/space/clientspace/space.go +++ b/space/clientspace/space.go @@ -24,8 +24,10 @@ import ( "github.com/anyproto/anytype-heart/pkg/lib/bundle" coresb "github.com/anyproto/anytype-heart/pkg/lib/core/smartblock" "github.com/anyproto/anytype-heart/pkg/lib/threads" + "github.com/anyproto/anytype-heart/space/clientspace/keyvalueservice" "github.com/anyproto/anytype-heart/space/internal/objectprovider" "github.com/anyproto/anytype-heart/space/spacecore" + "github.com/anyproto/anytype-heart/space/spacecore/keyvalueobserver" "github.com/anyproto/anytype-heart/space/spacecore/peermanager" "github.com/anyproto/anytype-heart/space/spacecore/storage" "github.com/anyproto/anytype-heart/space/spacecore/storage/anystorage" @@ -56,6 +58,8 @@ type Space interface { IsPersonal() bool GetAclIdentity() crypto.PubKey + KeyValueService() keyvalueservice.Service + Close(ctx context.Context) error } @@ -82,6 +86,7 @@ type space struct { derivedIDs threads.DerivedSmartblockIds installer bundledObjectsInstaller spaceCore spacecore.SpaceCoreService + keyValueService keyvalueservice.Service personalSpaceId string aclIdentity crypto.PubKey @@ -102,6 +107,7 @@ type SpaceDeps struct { AccountService accountservice.Service StorageService storage.ClientStorage SpaceCore spacecore.SpaceCoreService + KeyValueObserver keyvalueobserver.Observer PersonalSpaceId string LoadCtx context.Context DisableRemoteLoad bool @@ -150,7 +156,14 @@ func BuildSpace(ctx context.Context, deps SpaceDeps) (Space, error) { return nil, fmt.Errorf("install bundled objects: %w", err) } } + + sp.keyValueService, err = keyvalueservice.New(sp.common, deps.KeyValueObserver) + if err != nil { + return nil, fmt.Errorf("create key value service: %w", err) + } + go sp.mandatoryObjectsLoad(deps.LoadCtx, deps.DisableRemoteLoad) + return sp, nil } @@ -228,6 +241,10 @@ func (s *space) CommonSpace() commonspace.Space { return s.common } +func (s *space) KeyValueService() keyvalueservice.Service { + return s.keyValueService +} + func (s *space) WaitMandatoryObjects(ctx context.Context) (err error) { select { case <-s.loadMandatoryObjectsCh: diff --git a/space/internal/components/builder/builder.go b/space/internal/components/builder/builder.go index bfe84fafc..8688ba44f 100644 --- a/space/internal/components/builder/builder.go +++ b/space/internal/components/builder/builder.go @@ -93,15 +93,16 @@ func (b *spaceBuilder) BuildSpace(ctx context.Context, disableRemoteLoad bool) ( coreSpace.TreeSyncer().StopSync() } deps := clientspace.SpaceDeps{ - Indexer: b.indexer, - Installer: b.installer, - CommonSpace: coreSpace, - ObjectFactory: b.objectFactory, - AccountService: b.accountService, - PersonalSpaceId: b.personalSpaceId, - StorageService: b.storageService, - SpaceCore: b.spaceCore, - LoadCtx: b.ctx, + Indexer: b.indexer, + Installer: b.installer, + CommonSpace: coreSpace, + ObjectFactory: b.objectFactory, + AccountService: b.accountService, + PersonalSpaceId: b.personalSpaceId, + StorageService: b.storageService, + SpaceCore: b.spaceCore, + LoadCtx: b.ctx, + KeyValueObserver: coreSpace.KeyValueObserver(), } space, err := clientspace.BuildSpace(ctx, deps) if err != nil { diff --git a/space/spacecore/keyvalueobserver/observer.go b/space/spacecore/keyvalueobserver/observer.go index 4bf8d4e86..f08b07591 100644 --- a/space/spacecore/keyvalueobserver/observer.go +++ b/space/spacecore/keyvalueobserver/observer.go @@ -16,13 +16,11 @@ const CName = keyvaluestorage.IndexerCName type ObserverFunc func(decryptor keyvaluestorage.Decryptor, kvs []innerstorage.KeyValue) type Observer interface { - app.ComponentRunnable keyvaluestorage.Indexer SetObserver(observerFunc ObserverFunc) } func New() Observer { - return &observer{} } diff --git a/space/spacefactory/spacefactory.go b/space/spacefactory/spacefactory.go index 70865c30b..13a3bba0c 100644 --- a/space/spacefactory/spacefactory.go +++ b/space/spacefactory/spacefactory.go @@ -130,7 +130,6 @@ func (s *spaceFactory) CreateAndSetTechSpace(ctx context.Context) (*clientspace. s.techSpace = ts s.app = s.app.ChildApp() s.app.Register(s.techSpace) - s.app.Register(kvObserver) err = ts.Run(techCoreSpace, ts.Cache, kvObserver, true) if err != nil { return nil, fmt.Errorf("run tech space: %w", err) @@ -164,7 +163,6 @@ func (s *spaceFactory) LoadAndSetTechSpace(ctx context.Context) (*clientspace.Te s.techSpace = ts s.app = s.app.ChildApp() s.app.Register(s.techSpace) - s.app.Register(kvObserver) err = ts.Run(techCoreSpace, ts.Cache, kvObserver, false) if err != nil { return nil, fmt.Errorf("run tech space: %w", err) diff --git a/space/techspace/techspace.go b/space/techspace/techspace.go index 1d05e5ac5..86ed32476 100644 --- a/space/techspace/techspace.go +++ b/space/techspace/techspace.go @@ -68,9 +68,6 @@ type TechSpace interface { SpaceViewSetData(ctx context.Context, spaceId string, details *domain.Details) (err error) SpaceViewId(id string) (string, error) AccountObjectId() (string, error) - - KeyValueObserver() keyvalueobserver.Observer - KeyValueStore() keyvaluestorage.Storage } type SpaceView interface { @@ -398,10 +395,6 @@ func (s *techSpace) KeyValueStore() keyvaluestorage.Storage { return s.techCore.KeyValue().DefaultStore() } -func (s *techSpace) KeyValueObserver() keyvalueobserver.Observer { - return s.keyvalueObserver -} - func (s *techSpace) Close(ctx context.Context) (err error) { s.ctxCancel() s.mu.Lock()