1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-09 09:35:00 +09:00

Subscription prototype

This commit is contained in:
Sergey 2023-04-25 18:17:59 +02:00 committed by Mikhail Iudin
parent 6f86c196b1
commit 0d90265cf3
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
8 changed files with 1247 additions and 776 deletions

View file

@ -1,6 +1,9 @@
package collection
import (
"fmt"
"sync"
"github.com/anytypeio/go-anytype-middleware/app"
"github.com/anytypeio/go-anytype-middleware/core/block"
"github.com/anytypeio/go-anytype-middleware/core/block/editor/smartblock"
@ -13,10 +16,16 @@ import (
type Service struct {
picker block.Picker
lock *sync.RWMutex
collections map[string]chan []slice.Change[string]
}
func New() *Service {
return &Service{}
return &Service{
lock: &sync.RWMutex{},
collections: map[string]chan []slice.Change[string]{},
}
}
func (s *Service) Init(a *app.App) (err error) {
@ -37,7 +46,7 @@ func (s *Service) Add(ctx *session.Context, req *pb.RpcObjectCollectionAddReques
if pos >= 0 {
col = slice.Insert(col, pos+1, toAdd...)
} else {
col = append(col, toAdd...)
col = append(toAdd, col...)
}
return col
})
@ -71,10 +80,61 @@ func (s *Service) Sort(ctx *session.Context, req *pb.RpcObjectCollectionSortRequ
func (s *Service) updateCollection(ctx *session.Context, contextID string, modifier func(src []string) []string) error {
return block.DoStateCtx(s.picker, ctx, contextID, func(s *state.State, sb smartblock.SmartBlock) error {
store := s.Store()
lst := pbtypes.GetStringList(store, storeKey)
lst := pbtypes.GetStringList(s.Store(), storeKey)
lst = modifier(lst)
s.StoreSlice(storeKey, lst)
return nil
})
}
func (s *Service) RegisterCollection(sb smartblock.SmartBlock) {
s.lock.Lock()
changesCh, ok := s.collections[sb.Id()]
if !ok {
changesCh = make(chan []slice.Change[string])
s.collections[sb.Id()] = changesCh
}
s.lock.Unlock()
sb.AddHook(func(info smartblock.ApplyInfo) (err error) {
// TODO: I don't like that changes converted to marshalling format and then back again
var changes []slice.Change[string]
for _, ch := range info.Changes {
upd := ch.GetStoreSliceUpdate()
if upd == nil {
continue
}
if v := upd.GetAdd(); v != nil {
changes = append(changes, slice.MakeChangeAdd(v.Ids, v.AfterId))
} else if v := upd.GetRemove(); v != nil {
changes = append(changes, slice.MakeChangeRemove[string](v.Ids))
} else if v := upd.GetMove(); v != nil {
changes = append(changes, slice.MakeChangeMove[string](v.Ids, v.AfterId))
}
}
go func() {
changesCh <- changes
}()
return nil
}, smartblock.HookAfterApply)
}
func (s *Service) SubscribeForCollection(contextID string) ([]string, <-chan []slice.Change[string], error) {
var initialObjectIDs []string
// Waking up of collection smart block will automatically add hook used in RegisterCollection
err := block.DoState(s.picker, contextID, func(s *state.State, sb smartblock.SmartBlock) error {
initialObjectIDs = pbtypes.GetStringList(s.Store(), storeKey)
return nil
})
if err != nil {
return nil, nil, err
}
s.lock.RLock()
ch, ok := s.collections[contextID]
s.lock.RUnlock()
if !ok {
return nil, nil, fmt.Errorf("collection is not registered")
}
return initialObjectIDs, ch, err
}

View file

@ -313,6 +313,15 @@ func (sb *smartBlock) Init(ctx *InitContext) (err error) {
return
}
// TODO FIX FIRST
if sb.Id() == "bafybanjhjpw3dmpumc44e2aiynd6p4pcs3v3rrq64far3kslt3aix6ut" {
type collectionService interface {
RegisterCollection(sb SmartBlock)
}
colService := app.MustComponent[collectionService](ctx.App)
colService.RegisterCollection(sb)
}
return
}

View file

@ -0,0 +1,219 @@
package subscription
import (
"fmt"
"sync"
"github.com/gogo/protobuf/types"
"golang.org/x/exp/slices"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/database/filter"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/objectstore"
"github.com/anytypeio/go-anytype-middleware/util/slice"
)
type collectionContainer struct {
onUpdate func(changes []slice.Change[string])
filter filter.Filter
lock *sync.RWMutex
objectIDs []string
}
func newCollectionContainer(ids []string, onUpdate func(changes []slice.Change[string])) *collectionContainer {
return &collectionContainer{
onUpdate: onUpdate,
lock: &sync.RWMutex{},
objectIDs: ids,
}
}
func (c *collectionContainer) List() []string {
c.lock.RLock()
defer c.lock.RUnlock()
return c.objectIDs
}
func (c *collectionContainer) Update(changes []slice.Change[string]) {
c.lock.Lock()
c.objectIDs = slice.ApplyChanges(c.objectIDs, changes, slice.StringIdentity[string])
c.lock.Unlock()
c.onUpdate(changes)
}
func (c *collectionContainer) Compare(a, b filter.Getter) int {
c.lock.RLock()
defer c.lock.RUnlock()
ae, be := a.(*entry), b.(*entry)
ap, bp := slice.FindPos(c.objectIDs, ae.id), slice.FindPos(c.objectIDs, be.id)
if ap == bp {
return 0
}
if ap > bp {
return -1
}
return 1
}
func (c *collectionContainer) FilterObject(a filter.Getter) bool {
c.lock.RLock()
defer c.lock.RUnlock()
return slice.FindPos(c.objectIDs, a.(*entry).id) >= 0
}
func (c *collectionContainer) String() string {
return "collection order"
}
type collectionSub struct {
id string
keys []string
col *collectionContainer
sendEvent func(event *pb.Event)
cache *cache
objectStore objectstore.ObjectStore
activeIDs []string
activeEntries []*entry
}
func (s *service) newCollectionSub(id string, collectionID string, keys []string, filter filter.Filter, order filter.Order, limit, offset int) (*collectionSub, error) {
initialObjectIDs, changesCh, err := s.collections.SubscribeForCollection(collectionID)
if err != nil {
return nil, fmt.Errorf("subscribe for collection: %w", err)
}
col := newCollectionContainer(initialObjectIDs, nil)
col.filter = filter
sub := &collectionSub{
id: id,
keys: keys,
col: col,
sendEvent: s.sendEvent,
cache: s.cache,
objectStore: s.objectStore,
}
col.onUpdate = sub.onCollectionUpdate
go func() {
for ch := range changesCh {
col.Update(ch)
}
}()
return sub, nil
}
func (c *collectionSub) init(entries []*entry) (err error) {
entries = slice.Filter(entries, func(e *entry) bool {
return slices.Contains(c.col.List(), e.id)
})
c.activeEntries = entries
return nil
}
func (c *collectionSub) counters() (prev, next int) {
// TODO
return
}
func (c *collectionSub) onChange(ctx *opCtx) {
// TODO update details
}
func (c *collectionSub) onCollectionUpdate(changes []slice.Change[string]) {
c.activeIDs = slice.ApplyChanges(c.activeIDs, changes, slice.StringIdentity[string])
newEntries := make([]*entry, 0, len(c.activeIDs))
for _, id := range c.activeIDs {
if e := c.cache.Get(id); e != nil {
newEntries = append(newEntries, e)
continue
}
recs, err := c.objectStore.QueryById([]string{id})
if err != nil {
// TODO proper logging
fmt.Println("query new entry:", err)
}
if len(recs) > 0 {
newEntries = append(newEntries, &entry{
id: id,
data: recs[0].Details,
})
}
}
ctx := &opCtx{
entries: newEntries,
c: c.cache,
}
for _, ch := range changes {
if add := ch.Add(); add != nil {
afterID := add.AfterID
for _, id := range add.Items {
ctx.position = append(ctx.position, opPosition{
id: id,
subId: c.id,
afterId: afterID,
keys: c.keys,
isAdd: true,
})
// Update afterID to save correspondence between subscription changes and generic atomic changes
// The difference is that generic atomic changes contains a slice, so we need to update insertion position
afterID = id
}
continue
}
if rm := ch.Remove(); rm != nil {
for _, id := range rm.IDs {
ctx.remove = append(ctx.remove, opRemove{
id: id,
subId: c.id,
})
}
continue
}
if mv := ch.Move(); mv != nil {
afterID := mv.AfterID
for _, id := range mv.IDs {
ctx.position = append(ctx.position, opPosition{
id: id,
subId: c.id,
afterId: afterID,
keys: c.keys,
})
// Update afterID to save correspondence between subscription changes and generic atomic changes
// The difference is that generic atomic changes contains a slice, so we need to update moving position
afterID = id
}
continue
}
}
ev := ctx.apply()
c.activeEntries = newEntries
c.sendEvent(ev)
}
func (c *collectionSub) getActiveRecords() (res []*types.Struct) {
// TODO decide where to filter and reorder records. Here or in onChange?
for _, e := range c.activeEntries {
res = append(res, e.data)
}
return
}
func (c *collectionSub) hasDep() bool {
// TODO
return false
}
func (c *collectionSub) close() {
// TODO
}

View file

@ -3,28 +3,30 @@ package subscription
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-middleware/core/kanban"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/addr"
"strings"
"sync"
"time"
"github.com/anytypeio/any-sync/app"
"github.com/cheggaaa/mb"
"github.com/globalsign/mgo/bson"
"github.com/gogo/protobuf/types"
"github.com/ipfs/go-datastore/query"
"github.com/anytypeio/go-anytype-middleware/app"
"github.com/anytypeio/go-anytype-middleware/core/block/collection"
"github.com/anytypeio/go-anytype-middleware/core/event"
"github.com/anytypeio/go-anytype-middleware/core/kanban"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/bundle"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core/smartblock"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/database"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/database/filter"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/addr"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/objectstore"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/logging"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/pb/model"
"github.com/anytypeio/go-anytype-middleware/util/pbtypes"
"github.com/anytypeio/go-anytype-middleware/util/slice"
"github.com/cheggaaa/mb"
"github.com/globalsign/mgo/bson"
"github.com/gogo/protobuf/types"
"github.com/ipfs/go-datastore/query"
)
const CName = "subscription"
@ -65,6 +67,7 @@ type service struct {
objectStore objectstore.ObjectStore
kanban kanban.Service
collections *collection.Service
sendEvent func(e *pb.Event)
m sync.Mutex
@ -79,6 +82,7 @@ func (s *service) Init(a *app.App) (err error) {
s.kanban = a.MustComponent(kanban.CName).(kanban.Service)
s.recBatch = mb.New(0)
s.sendEvent = a.MustComponent(event.CName).(event.Sender).Send
s.collections = app.MustComponent[*collection.Service](a)
s.ctxBuf = &opCtx{c: s.cache}
return
}
@ -139,6 +143,15 @@ func (s *service) Search(req pb.RpcObjectSearchSubscribeRequest) (resp *pb.RpcOb
if req.Limit < 0 {
req.Limit = 0
}
// if req.SubId == "bafyecciobreco4u7pixvnzzkg4ktheglgiexzcv2qhqo2upzsznho7xz-dataview" {
// req.CollectionId = "bafybanjhjpw3dmpumc44e2aiynd6p4pcs3v3rrq64far3kslt3aix6ut"
// }
if req.CollectionId != "" {
return s.makeCollectionSubscription(req, f, records)
}
sub := s.newSortedSub(req.SubId, req.Keys, f.FilterObj, f.Order, int(req.Limit), int(req.Offset))
if req.NoDepSubscription {
sub.disableDep = true
@ -178,6 +191,50 @@ func (s *service) Search(req pb.RpcObjectSearchSubscribeRequest) (resp *pb.RpcOb
return
}
func (s *service) makeCollectionSubscription(req pb.RpcObjectSearchSubscribeRequest, f *database.Filters, records []database.Record) (*pb.RpcObjectSearchSubscribeResponse, error) {
sub, err := s.newCollectionSub(req.SubId, req.CollectionId, req.Keys, f.FilterObj, f.Order, int(req.Limit), int(req.Offset))
if err != nil {
return nil, err
}
// TODO
// if req.NoDepSubscription {
// sub.sortedSub.disableDep = true
// } else {
// sub.sortedSub.forceSubIds = filterDepIds
// }
entries := make([]*entry, 0, len(records))
for _, r := range records {
entries = append(entries, &entry{
id: pbtypes.GetString(r.Details, "id"),
data: r.Details,
})
}
if err = sub.init(entries); err != nil {
return nil, fmt.Errorf("subscription init error: %v", err)
}
s.subscriptions[sub.id] = sub
prev, next := sub.counters()
var depRecords, subRecords []*types.Struct
subRecords = sub.getActiveRecords()
// TODO
// if sub.depSub != nil {
// depRecords = sub.depSub.getActiveRecords()
// }
return &pb.RpcObjectSearchSubscribeResponse{
Records: subRecords,
Dependencies: depRecords,
SubId: sub.id,
Counters: &pb.EventObjectSubscriptionCounters{
Total: int64(len(sub.activeEntries)),
NextCount: int64(prev),
PrevCount: int64(next),
},
}, nil
}
func (s *service) SubscribeIdsReq(req pb.RpcObjectSubscribeIdsRequest) (resp *pb.RpcObjectSubscribeIdsResponse, err error) {
records, err := s.objectStore.QueryById(req.Ids)
if err != nil {

View file

@ -26,6 +26,10 @@
- [Change.Snapshot.LogHeadsEntry](#anytype-Change-Snapshot-LogHeadsEntry)
- [Change.StoreKeySet](#anytype-Change-StoreKeySet)
- [Change.StoreKeyUnset](#anytype-Change-StoreKeyUnset)
- [Change.StoreSliceUpdate](#anytype-Change-StoreSliceUpdate)
- [Change.StoreSliceUpdate.Add](#anytype-Change-StoreSliceUpdate-Add)
- [Change.StoreSliceUpdate.Move](#anytype-Change-StoreSliceUpdate-Move)
- [Change.StoreSliceUpdate.Remove](#anytype-Change-StoreSliceUpdate-Remove)
- [Change._RelationAdd](#anytype-Change-_RelationAdd)
- [Change._RelationRemove](#anytype-Change-_RelationRemove)
- [Change._RelationUpdate](#anytype-Change-_RelationUpdate)
@ -1696,6 +1700,7 @@ the element of change tree used to store and internal apply smartBlock history
| objectTypeRemove | [Change.ObjectTypeRemove](#anytype-Change-ObjectTypeRemove) | | |
| storeKeySet | [Change.StoreKeySet](#anytype-Change-StoreKeySet) | | |
| storeKeyUnset | [Change.StoreKeyUnset](#anytype-Change-StoreKeyUnset) | | |
| storeSliceUpdate | [Change.StoreSliceUpdate](#anytype-Change-StoreSliceUpdate) | | |
@ -1889,6 +1894,71 @@ the element of change tree used to store and internal apply smartBlock history
<a name="anytype-Change-StoreSliceUpdate"></a>
### Change.StoreSliceUpdate
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| key | [string](#string) | | |
| add | [Change.StoreSliceUpdate.Add](#anytype-Change-StoreSliceUpdate-Add) | | |
| remove | [Change.StoreSliceUpdate.Remove](#anytype-Change-StoreSliceUpdate-Remove) | | |
| move | [Change.StoreSliceUpdate.Move](#anytype-Change-StoreSliceUpdate-Move) | | |
<a name="anytype-Change-StoreSliceUpdate-Add"></a>
### Change.StoreSliceUpdate.Add
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| afterId | [string](#string) | | |
| ids | [string](#string) | repeated | |
<a name="anytype-Change-StoreSliceUpdate-Move"></a>
### Change.StoreSliceUpdate.Move
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| afterId | [string](#string) | | |
| ids | [string](#string) | repeated | |
<a name="anytype-Change-StoreSliceUpdate-Remove"></a>
### Change.StoreSliceUpdate.Remove
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| ids | [string](#string) | repeated | |
<a name="anytype-Change-_RelationAdd"></a>
### Change._RelationAdd
@ -10926,6 +10996,7 @@ deprecated, to be removed |
| source | [string](#string) | repeated | |
| ignoreWorkspace | [string](#string) | | |
| noDepSubscription | [bool](#bool) | | disable dependent subscription |
| collectionId | [string](#string) | | |

File diff suppressed because it is too large Load diff

View file

@ -1257,6 +1257,7 @@ message Rpc {
string ignoreWorkspace = 12;
// disable dependent subscription
bool noDepSubscription = 13;
string collectionId = 14;
}
message Response {

View file

@ -6,6 +6,7 @@ import (
"github.com/mb0/diff"
)
// StringIdentity is identity function: it returns its argument as string
func StringIdentity[T ~string](x T) string {
return string(x)
}