1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-09 17:44:59 +09:00
This commit is contained in:
Sergey Cherepanov 2021-12-14 16:16:44 +03:00
commit 5dc2558b57
No known key found for this signature in database
GPG key ID: 085319C64294F576
68 changed files with 1586 additions and 880 deletions

View file

@ -46,6 +46,7 @@ type ComponentRunnable interface {
type App struct {
components []Component
mu sync.RWMutex
startStat StartStat
}
// Name returns app name
@ -58,6 +59,18 @@ func (app *App) Version() string {
return GitSummary
}
type StartStat struct {
SpentMsPerComp map[string]int64
SpentMsTotal int64
}
// StartStat returns total time spent per comp
func (app *App) StartStat() StartStat {
app.mu.Lock()
defer app.mu.Unlock()
return app.startStat
}
// VersionDescription return the full info about the build
func (app *App) VersionDescription() string {
return VersionDescription()
@ -123,6 +136,7 @@ func (app *App) ComponentNames() (names []string) {
func (app *App) Start() (err error) {
app.mu.RLock()
defer app.mu.RUnlock()
app.startStat.SpentMsPerComp = make(map[string]int64)
closeServices := func(idx int) {
for i := idx; i >= 0; i-- {
@ -145,10 +159,14 @@ func (app *App) Start() (err error) {
for i, s := range app.components {
if serviceRun, ok := s.(ComponentRunnable); ok {
start := time.Now()
if err = serviceRun.Run(); err != nil {
closeServices(i)
return fmt.Errorf("can't run service '%s': %v", serviceRun.Name(), err)
}
spent := time.Since(start).Milliseconds()
app.startStat.SpentMsTotal += spent
app.startStat.SpentMsPerComp[s.Name()] = spent
}
}
log.Debugf("All components started")

View file

@ -373,13 +373,37 @@ func (sb *stateBuilder) loadChange(id string) (ch *Change, err error) {
sr, err := sb.smartblock.GetRecord(ctx, id)
s := time.Since(st)
if err != nil {
log.With("thread", sb.smartblock.ID()).Errorf("failed to loadChange %s after %.1fs. Total %.1f(%d records were loaded)", id, s.Seconds(), sb.qt.Seconds(), sb.qr)
log.With("thread", sb.smartblock.ID()).
Errorf("failed to loadChange %s after %.2fs. Total %.2f(%d records were loaded)", id, s.Seconds(), sb.qt.Seconds(), sb.qr)
return
}
sb.qt += s
sb.qr++
if sb.qt.Seconds() > 3 {
log.With("thread", sb.smartblock.ID()).Debugf("long loadChange %.1fs for %s. Total %.1f(%d records)", s.Seconds(), id, sb.qt.Seconds(), sb.qr)
if s.Seconds() > 0.1 {
// this means we got this record through bitswap, so lets log some details
lgs, _ := sb.smartblock.GetLogs()
var sbLog *core.SmartblockLog
for _, lg := range lgs {
if lg.ID == sr.LogID {
sbLog = &lg
break
}
}
var (
logHead string
logCounter int64
)
if sbLog != nil {
logHead = sbLog.Head
logCounter = sbLog.HeadCounter
}
log.With("thread", sb.smartblock.ID()).
With("logid", sr.LogID).
With("logHead", logHead).
With("logCounter", logCounter).
Errorf("long loadChange %.2fs for %s. Total %.2f(%d records)", s.Seconds(), id, sb.qt.Seconds(), sb.qr)
}
chp := new(pb.Change)
if err = sr.Unmarshal(chp); err != nil {

View file

@ -1,4 +1,4 @@
// +build !linux,!darwin android ios
// +build !linux,!darwin android ios nographviz
// +build !amd64
package change

View file

@ -1,5 +1,5 @@
// +build linux darwin
// +build !android,!ios
// +build !android,!ios,!nographviz
// +build amd64 arm64
package change

View file

@ -17,6 +17,7 @@ import (
"github.com/anytypeio/go-anytype-middleware/core/anytype"
"github.com/anytypeio/go-anytype-middleware/core/block"
"github.com/anytypeio/go-anytype-middleware/core/configfetcher"
"github.com/anytypeio/go-anytype-middleware/metrics"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/pb/model"
@ -108,9 +109,7 @@ func checkInviteCode(code string, account string) error {
func (mw *Middleware) getAccountConfig() *pb.RpcAccountConfig {
fetcher := mw.app.MustComponent(configfetcher.CName).(configfetcher.ConfigFetcher)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
cfg := fetcher.GetAccountConfigWithContext(ctx)
cfg := fetcher.GetAccountConfig()
// TODO: change proto defs to use same model from "models.proto" and not from "api.proto"
return &pb.RpcAccountConfig{
@ -184,11 +183,23 @@ func (mw *Middleware) AccountCreate(req *pb.RpcAccountCreateRequest) *pb.RpcAcco
}
comps = append(comps, mw.EventSender)
if mw.app, err = anytype.StartNewApp(comps...); err != nil {
return response(newAcc, pb.RpcAccountCreateResponseError_ACCOUNT_CREATED_BUT_FAILED_TO_START_NODE, err)
}
stat := mw.app.StartStat()
if stat.SpentMsTotal > 300 {
log.Errorf("AccountCreate app start takes %dms: %v", stat.SpentMsTotal, stat.SpentMsPerComp)
}
metrics.SharedClient.RecordEvent(metrics.AppStart{
Type: "create",
TotalMs: stat.SpentMsTotal,
PerCompMs: stat.SpentMsPerComp})
coreService := mw.app.MustComponent(core.CName).(core.Service)
newAcc.Name = req.Name
bs := mw.app.MustComponent(block.CName).(block.Service)
details := []*pb.RpcBlockSetDetailsDetail{{Key: "name", Value: pbtypes.String(req.Name)}}
@ -476,6 +487,16 @@ func (mw *Middleware) AccountSelect(req *pb.RpcAccountSelectRequest) *pb.RpcAcco
return response(nil, pb.RpcAccountSelectResponseError_FAILED_TO_RUN_NODE, err)
}
stat := mw.app.StartStat()
if stat.SpentMsTotal > 300 {
log.Errorf("AccountSelect app start takes %dms: %v", stat.SpentMsTotal, stat.SpentMsPerComp)
}
metrics.SharedClient.RecordEvent(metrics.AppStart{
Type: "select",
TotalMs: stat.SpentMsTotal,
PerCompMs: stat.SpentMsPerComp})
return response(&model.Account{Id: req.Id}, pb.RpcAccountSelectResponseError_NULL, nil)
}

View file

@ -32,6 +32,7 @@ import (
"github.com/anytypeio/go-anytype-middleware/pkg/lib/profilefinder"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/threads"
walletUtil "github.com/anytypeio/go-anytype-middleware/pkg/lib/wallet"
"github.com/anytypeio/go-anytype-middleware/util/builtinobjects"
"github.com/anytypeio/go-anytype-middleware/util/builtintemplate"
"github.com/anytypeio/go-anytype-middleware/util/linkpreview"
)
@ -52,10 +53,7 @@ func StartAccountRecoverApp(eventSender event.Sender, accountPrivKey walletUtil.
Register(profilefinder.New()).
Register(eventSender)
metrics.SharedClient.SetAppVersion(a.Version())
metrics.SharedClient.Run()
if err = a.Start(); err != nil {
metrics.SharedClient.Close()
return
}
@ -81,6 +79,7 @@ func StartNewApp(components ...app.Component) (a *app.App, err error) {
a = nil
return
}
return
}
@ -113,6 +112,7 @@ func Bootstrap(a *app.App, components ...app.Component) {
Register(restriction.New()).
Register(debug.New()).
Register(doc.New()).
Register(subscription.New())
Register(subscription.New()).
Register(builtinobjects.New())
return
}

View file

@ -8,6 +8,7 @@ import (
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core"
"github.com/anytypeio/go-anytype-middleware/util/ocache"
ds "github.com/ipfs/go-datastore"
"github.com/textileio/go-threads/core/thread"
"time"
"github.com/anytypeio/go-anytype-middleware/core/block/doc"
@ -894,7 +895,9 @@ func (s *service) SetObjectTypes(ctx *state.Context, objectId string, objectType
})
}
func (s *service) CreateObjectInWorkspace(ctx context.Context, workspaceId string, sbType coresb.SmartBlockType) (csm core.SmartBlock, err error) {
// todo: rewrite with options
// withId may me empty
func (s *service) CreateObjectInWorkspace(ctx context.Context, workspaceId string, withId thread.ID, sbType coresb.SmartBlockType) (csm core.SmartBlock, err error) {
startTime := time.Now()
ev, exists := ctx.Value(ObjectCreateEvent).(*metrics.CreateObjectEvent)
err = s.DoWithContext(ctx, workspaceId, func(b smartblock.SmartBlock) error {
@ -905,7 +908,7 @@ func (s *service) CreateObjectInWorkspace(ctx context.Context, workspaceId strin
if !ok {
return fmt.Errorf("incorrect object with workspace id")
}
csm, err = workspace.CreateObject(sbType)
csm, err = workspace.CreateObject(withId, sbType)
if exists {
ev.WorkspaceCreateMs = time.Now().Sub(startTime).Milliseconds() - ev.GetWorkspaceBlockWaitMs
}
@ -945,7 +948,7 @@ func (s *service) CreateSet(ctx *state.Context, req pb.RpcBlockCreateSetRequest)
workspaceId = s.anytype.PredefinedBlocks().Account
}
// TODO: here can be a deadlock if this is somehow created from workspace (as set)
csm, err := s.CreateObjectInWorkspace(context.TODO(), workspaceId, coresb.SmartBlockTypeSet)
csm, err := s.CreateObjectInWorkspace(context.TODO(), workspaceId, thread.Undef, coresb.SmartBlockTypeSet)
if err != nil {
return "", "", err
}

View file

@ -108,7 +108,17 @@ func (bs *basic) Create(ctx *state.Context, groupId string, req pb.RpcBlockCreat
req.TargetId = template.HeaderLayoutId
}
}
if req.Block.GetContent() == nil {
err = fmt.Errorf("no block content")
return
}
req.Block.Id = ""
block := simple.New(req.Block)
block.Model().ChildrenIds = nil
err = block.Validate()
if err != nil {
return
}
s.Add(block)
if err = s.InsertTo(req.TargetId, req.Position, block.Model().Id); err != nil {
return
@ -205,8 +215,17 @@ func (bs *basic) Replace(ctx *state.Context, id string, block *model.Block) (new
}
s := bs.NewStateCtx(ctx)
if block.GetContent() == nil {
err = fmt.Errorf("no block content")
return
}
new := simple.New(block)
newId = new.Model().Id
new.Model().ChildrenIds = nil
err = new.Validate()
if err != nil {
return
}
s.Add(new)
if err = s.InsertTo(id, model.Block_Replace, newId); err != nil {
return

View file

@ -24,7 +24,7 @@ func TestBasic_Create(t *testing.T) {
sb.AddBlock(simple.New(&model.Block{Id: "test"}))
b := NewBasic(sb)
id, err := b.Create(nil, "", pb.RpcBlockCreateRequest{
Block: &model.Block{},
Block: &model.Block{Content: &model.BlockContentOfText{Text: &model.BlockContentText{Text: "ll"}}},
})
require.NoError(t, err)
require.NotEmpty(t, id)
@ -38,7 +38,7 @@ func TestBasic_Create(t *testing.T) {
id, err := b.Create(nil, "", pb.RpcBlockCreateRequest{
TargetId: template.TitleBlockId,
Position: model.Block_Top,
Block: &model.Block{},
Block: &model.Block{Content: &model.BlockContentOfText{Text: &model.BlockContentText{Text: "ll"}}},
})
require.NoError(t, err)
require.NotEmpty(t, id)
@ -120,14 +120,14 @@ func TestBasic_Move(t *testing.T) {
id1, err := b.Create(nil, "", pb.RpcBlockCreateRequest{
TargetId: template.HeaderLayoutId,
Position: model.Block_Bottom,
Block: &model.Block{},
Block: &model.Block{Content: &model.BlockContentOfText{Text: &model.BlockContentText{Text: "ll"}}},
})
require.NoError(t, err)
require.NotEmpty(t, id1)
id0, err := b.Create(nil, "", pb.RpcBlockCreateRequest{
TargetId: template.HeaderLayoutId,
Position: model.Block_Bottom,
Block: &model.Block{},
Block: &model.Block{Content: &model.BlockContentOfText{Text: &model.BlockContentText{Text: "ll"}}},
})
require.NoError(t, err)
require.NotEmpty(t, id0)
@ -148,7 +148,7 @@ func TestBasic_Replace(t *testing.T) {
sb.AddBlock(simple.New(&model.Block{Id: "test", ChildrenIds: []string{"2"}})).
AddBlock(simple.New(&model.Block{Id: "2"}))
b := NewBasic(sb)
newId, err := b.Replace(nil, "2", &model.Block{})
newId, err := b.Replace(nil, "2", &model.Block{Content: &model.BlockContentOfText{Text: &model.BlockContentText{Text: "l"}}})
require.NoError(t, err)
require.NotEmpty(t, newId)
}

View file

@ -84,6 +84,7 @@ func TestUploader_Upload(t *testing.T) {
require.NoError(t, res.Err)
assert.Equal(t, res.Hash, "123")
assert.Equal(t, res.Name, "unnamed.jpg")
res.Size = 1
b := res.ToBlock()
assert.Equal(t, b.Model().GetFile().Name, "unnamed.jpg")
})

View file

@ -179,6 +179,20 @@ func (sb *smartBlock) Id() string {
return sb.source.Id()
}
func (s *smartBlock) GetFileKeys() (keys []pb.ChangeFileKeys) {
keys2 := s.source.GetFileKeysSnapshot()
for _, key := range keys2 {
if key == nil {
continue
}
keys = append(keys, pb.ChangeFileKeys{
Hash: key.Hash,
Keys: key.Keys,
})
}
return
}
func (sb *smartBlock) Meta() *core.SmartBlockMeta {
return &core.SmartBlockMeta{
ObjectTypes: sb.ObjectTypes(),
@ -207,7 +221,10 @@ func (sb *smartBlock) Init(ctx *InitContext) (err error) {
sb.objectStore = ctx.ObjectStore
sb.lastDepDetails = map[string]*pb.EventObjectDetailsSet{}
sb.storeFileKeys()
if ctx.State != nil {
// need to store file keys in case we have some new files in the state
sb.storeFileKeys(ctx.State)
}
sb.Doc.BlocksInit(sb.Doc.(simple.DetailsService))
if ctx.State == nil {
@ -1321,7 +1338,7 @@ func (sb *smartBlock) StateAppend(f func(d state.Doc) (s *state.State, err error
ContextId: sb.Id(),
})
}
sb.storeFileKeys()
sb.storeFileKeys(s)
if hasDepIds(&act) {
sb.CheckSubscriptions()
}
@ -1350,7 +1367,7 @@ func (sb *smartBlock) StateRebuild(d state.Doc) (err error) {
})
}
}
sb.storeFileKeys()
sb.storeFileKeys(d)
sb.CheckSubscriptions()
sb.reportChange(sb.Doc.(*state.State))
sb.execHooks(HookAfterApply)
@ -1436,8 +1453,11 @@ func getChangedFileHashes(s *state.State, fileDetailKeys []string, act undo.Acti
return
}
func (sb *smartBlock) storeFileKeys() {
keys := sb.Doc.GetFileKeys()
func (sb *smartBlock) storeFileKeys(doc state.Doc) {
if doc == nil {
return
}
keys := doc.GetFileKeys()
if len(keys) == 0 {
return
}

View file

@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"github.com/ipfs/go-cid"
"strings"
"time"
"unicode/utf8"
@ -1157,6 +1158,14 @@ func (s *State) GetAllFileHashes(detailsKeys []string) (hashes []string) {
}
for _, key := range detailsKeys {
if key == bundle.RelationKeyCoverId.String() {
v := pbtypes.GetString(det, key)
_, err := cid.Decode(v)
if err != nil {
// this is an exception cause coverId can contains not a file hash but color
continue
}
}
if v := pbtypes.GetStringList(det, key); v != nil {
for _, hash := range v {
if hash == "" {

View file

@ -53,7 +53,9 @@ func (t *Template) Init(ctx *smartblock.InitContext) (err error) {
func (t *Template) GetNewPageState(name string) (st *state.State, err error) {
st = t.NewState().Copy()
st.SetObjectType(pbtypes.GetString(st.Details(), bundle.RelationKeyTargetObjectType.String()))
st.RemoveDetail(bundle.RelationKeyTargetObjectType.String(), bundle.RelationKeyTemplateIsBundled.String())
// clean-up local details from the template
st.SetLocalDetails(nil)
st.SetDetail(bundle.RelationKeyName.String(), pbtypes.String(name))
if title := st.Get(template.TitleBlockId); title != nil {
title.Model().GetText().Text = ""

View file

@ -56,9 +56,16 @@ func (wp *WorkspaceParameters) Equal(other *WorkspaceParameters) bool {
return wp.IsHighlighted == other.IsHighlighted
}
func (p *Workspaces) CreateObject(sbType smartblock2.SmartBlockType) (core.SmartBlock, error) {
func (p *Workspaces) CreateObject(id thread.ID, sbType smartblock2.SmartBlockType) (core.SmartBlock, error) {
st := p.NewState()
threadInfo, err := p.threadQueue.CreateThreadSync(sbType, p.Id())
if !id.Defined() {
var err error
id, err = threads.ThreadCreateID(thread.AccessControlled, sbType)
if err != nil {
return nil, err
}
}
threadInfo, err := p.threadQueue.CreateThreadSync(id, p.Id())
if err != nil {
return nil, err
}

View file

@ -113,7 +113,7 @@ func (e *export) Export(req pb.RpcExportRequest) (path string, succeed int, err
did := docId
if err = queue.Wait(func() {
log.With("threadId", did).Debugf("write doc")
if werr := e.writeDoc(req.Format, wr, docIds, queue, did); werr != nil {
if werr := e.writeDoc(req.Format, wr, docIds, queue, did, req.IncludeFiles); werr != nil {
log.With("threadId", did).Warnf("can't export doc: %v", werr)
} else {
succeed++
@ -161,6 +161,8 @@ func (e *export) idsForExport(reqIds []string, includeNested bool) (ids []string
var m map[string]struct{}
if includeNested {
m = make(map[string]struct{}, len(reqIds)*10)
} else {
m = make(map[string]struct{}, len(reqIds))
}
var getNested func(id string)
getNested = func(id string) {
@ -190,7 +192,9 @@ func (e *export) idsForExport(reqIds []string, includeNested bool) (ids []string
if _, exists := m[id]; !exists {
ids = append(ids, id)
m[id] = struct{}{}
getNested(id)
if includeNested {
getNested(id)
}
}
}
@ -202,7 +206,7 @@ func (e *export) writeMultiDoc(mw converter.MultiConverter, wr writer, docIds []
if err = queue.Wait(func() {
log.With("threadId", did).Debugf("write doc")
werr := e.bs.Do(did, func(b sb.SmartBlock) error {
return mw.Add(b.NewState())
return mw.Add(b.NewState().Copy())
})
if err != nil {
log.With("threadId", did).Warnf("can't export doc: %v", werr)
@ -244,17 +248,20 @@ func (e *export) writeMultiDoc(mw converter.MultiConverter, wr writer, docIds []
return
}
func (e *export) writeDoc(format pb.RpcExportFormat, wr writer, docIds []string, queue process.Queue, docId string) (err error) {
func (e *export) writeDoc(format pb.RpcExportFormat, wr writer, docIds []string, queue process.Queue, docId string, exportFiles bool) (err error) {
return e.bs.Do(docId, func(b sb.SmartBlock) error {
if pbtypes.GetBool(b.CombinedDetails(), bundle.RelationKeyIsArchived.String()) {
return nil
}
var conv converter.Converter
switch format {
case pb.RpcExport_Markdown:
conv = md.NewMDConverter(e.a, b.NewState(), wr.Namer())
case pb.RpcExport_Protobuf:
conv = pbc.NewConverter(b.NewState())
conv = pbc.NewConverter(b)
case pb.RpcExport_JSON:
conv = pbjson.NewConverter(b.NewState())
conv = pbjson.NewConverter(b)
}
conv.SetKnownLinks(docIds)
result := conv.Convert()
@ -265,6 +272,9 @@ func (e *export) writeDoc(format pb.RpcExportFormat, wr writer, docIds []string,
if err = wr.WriteFile(filename, bytes.NewReader(result)); err != nil {
return err
}
if !exportFiles {
return nil
}
for _, fh := range conv.FileHashes() {
fileHash := fh
if err = queue.Add(func() {

View file

@ -5,12 +5,13 @@ import (
"encoding/base64"
"errors"
"fmt"
"github.com/anytypeio/go-anytype-middleware/metrics"
"github.com/gogo/protobuf/proto"
"net/url"
"strings"
"time"
"github.com/anytypeio/go-anytype-middleware/metrics"
"github.com/gogo/protobuf/proto"
"github.com/anytypeio/go-anytype-middleware/app"
"github.com/anytypeio/go-anytype-middleware/core/block/doc"
"github.com/anytypeio/go-anytype-middleware/core/block/editor"
@ -296,6 +297,11 @@ func (s *service) initPredefinedBlocks() {
}
startTime := time.Now()
for _, id := range ids {
headsHash, _ := s.anytype.ObjectStore().GetLastIndexedHeadsHash(id)
if headsHash != "" {
// skip object that has been already indexed before
continue
}
ctx := &smartblock.InitContext{State: state.NewDoc(id, nil).(*state.State)}
// this is needed so that old account will create its state successfully on first launch
if id == s.anytype.PredefinedBlocks().AccountOld {
@ -755,13 +761,14 @@ func (s *service) DeleteObject(id string) (err error) {
if err = s.Anytype().FileStore().DeleteByHash(fileHash); err != nil {
log.With("file", fileHash).Errorf("failed to delete file from filestore: %s", err.Error())
}
if err = s.Anytype().FileStore().DeleteFileKeys(fileHash); err != nil {
log.With("file", fileHash).Errorf("failed to delete file keys: %s", err.Error())
}
// space will be reclaimed on the next GC cycle
if _, err = s.Anytype().FileOffload(fileHash); err != nil {
log.With("file", fileHash).Errorf("failed to offload file: %s", err.Error())
continue
}
if err = s.Anytype().FileStore().DeleteFileKeys(fileHash); err != nil {
log.With("file", fileHash).Errorf("failed to delete file keys: %s", err.Error())
}
}
}
@ -866,7 +873,15 @@ func (s *service) CreateSmartBlockFromState(ctx context.Context, sbType coresb.S
SetDetailsMs: time.Now().Sub(startTime).Milliseconds(),
}
ctx = context.WithValue(ctx, ObjectCreateEvent, ev)
csm, err := s.CreateObjectInWorkspace(ctx, workspaceId, sbType)
var tid = thread.Undef
if id := pbtypes.GetString(createState.CombinedDetails(), bundle.RelationKeyId.String()); id != "" {
tid, err = thread.Decode(id)
if err != nil {
log.Errorf("failed to decode thread id from the state: %s", err.Error())
}
}
csm, err := s.CreateObjectInWorkspace(ctx, workspaceId, tid, sbType)
if err != nil {
err = fmt.Errorf("anytype.CreateBlock error: %v", err)
return
@ -1297,6 +1312,9 @@ func (s *service) ApplyTemplate(contextId, templateId string) error {
ts.SetParent(orig)
ts.BlocksInit(orig)
ts.InjectDerivedDetails()
// preserve localDetails from the original object
ts.SetLocalDetails(orig.LocalDetails())
return b.Apply(ts, smartblock.NoRestrictions)
})
}

View file

@ -90,6 +90,10 @@ func (b *Base) Copy() simple.Block {
return NewBase(pbtypes.CopyBlock(b.Model()))
}
func (b *Base) Validate() error {
return nil
}
func (b *Base) String() string {
return fmt.Sprintf("%s: %T (%d)", b.Id, b.Content, len(b.ChildrenIds))
}

View file

@ -58,6 +58,11 @@ func (b *Div) SetStyle(style model.BlockContentDivStyle) {
b.content.Style = style
}
// Validate TODO: add validation rules
func (b *Div) Validate() error {
return nil
}
func (d *Div) ApplyEvent(e *pb.EventBlockSetDiv) (err error) {
if e.Style != nil {
d.content.Style = e.Style.GetValue()

View file

@ -93,6 +93,11 @@ func (f *Bookmark) Copy() simple.Block {
}
}
// Validate TODO: add validation rules
func (f *Bookmark) Validate() error {
return nil
}
func (f *Bookmark) Diff(b simple.Block) (msgs []simple.EventMessage, err error) {
bookmark, ok := b.(*Bookmark)
if !ok {

View file

@ -73,6 +73,11 @@ func (d *Dataview) Copy() simple.Block {
}
}
// Validate TODO: add validation rules
func (d *Dataview) Validate() error {
return nil
}
func (d *Dataview) Diff(b simple.Block) (msgs []simple.EventMessage, err error) {
dv, ok := b.(*Dataview)
if !ok {

View file

@ -109,6 +109,13 @@ func (f *File) Copy() simple.Block {
}
}
func (f *File) Validate() error {
if f.content.State == model.BlockContentFile_Done && f.content.Size_ == 0 {
return fmt.Errorf("empty file size and content State is Done")
}
return nil
}
func (f *File) Diff(b simple.Block) (msgs []simple.EventMessage, err error) {
file, ok := b.(*File)
if !ok {

View file

@ -61,3 +61,14 @@ func TestFile_Diff(t *testing.T) {
assert.NotNil(t, change.Mime)
})
}
func TestFile_Validate(t *testing.T) {
t.Run("not validated", func(t *testing.T) {
b := NewFile(&model.Block{
Restrictions: &model.BlockRestrictions{},
Content: &model.BlockContentOfFile{File: &model.BlockContentFile{State: model.BlockContentFile_Done, Size_: 0}},
}).(*File)
err := b.Validate()
assert.Error(t, err)
})
}

View file

@ -45,6 +45,11 @@ func (l *Latex) Copy() simple.Block {
}
}
// Validate TODO: add validation rules
func (l *Latex) Validate() error {
return nil
}
func (l *Latex) Diff(b simple.Block) (msgs []simple.EventMessage, err error) {
latex, ok := b.(*Latex)
if !ok {

View file

@ -48,6 +48,13 @@ func (l *Link) Copy() simple.Block {
}
}
func (l *Link) Validate() error {
if l.content.TargetBlockId == "" {
return fmt.Errorf("targetBlockId is empty")
}
return nil
}
func (l *Link) Diff(b simple.Block) (msgs []simple.EventMessage, err error) {
link, ok := b.(*Link)
if !ok {

View file

@ -16,7 +16,7 @@ func TestLink_Diff(t *testing.T) {
testBlock := func() *Link {
return NewLink(&model.Block{
Restrictions: &model.BlockRestrictions{},
Content: &model.BlockContentOfLink{Link: &model.BlockContentLink{}},
Content: &model.BlockContentOfLink{Link: &model.BlockContentLink{TargetBlockId: "some target"}},
}).(*Link)
}
t.Run("type error", func(t *testing.T) {
@ -89,3 +89,14 @@ func TestLink_ToText(t *testing.T) {
assert.Equal(t, &model.Range{0, 8}, textModel.Marks.Marks[0].Range)
})
}
func TestLink_Validate(t *testing.T) {
t.Run("not validated", func(t *testing.T) {
b := NewLink(&model.Block{
Restrictions: &model.BlockRestrictions{},
Content: &model.BlockContentOfLink{Link: &model.BlockContentLink{TargetBlockId: ""}},
}).(*Link)
err := b.Validate()
assert.Error(t, err)
})
}

View file

@ -43,6 +43,11 @@ func (l *Relation) Copy() simple.Block {
}
}
// Validate TODO: add validation rules
func (l *Relation) Validate() error {
return nil
}
func (l *Relation) Diff(b simple.Block) (msgs []simple.EventMessage, err error) {
relation, ok := b.(*Relation)
if !ok {

View file

@ -28,6 +28,7 @@ type Block interface {
Diff(block Block) (msgs []EventMessage, err error)
String() string
Copy() Block
Validate() error
}
type FileHashes interface {

View file

@ -86,6 +86,11 @@ func (td *textDetails) Copy() simple.Block {
}
}
// Validate TODO: add validation rules
func (td *textDetails) Validate() error {
return nil
}
func (td *textDetails) Diff(s simple.Block) (msgs []simple.EventMessage, err error) {
sd, ok := s.(*textDetails)
if !ok {

View file

@ -89,6 +89,11 @@ func (t *Text) Copy() simple.Block {
return NewText(pbtypes.CopyBlock(t.Model()))
}
// Validate TODO: add validation rules
func (t *Text) Validate() error {
return nil
}
func (t *Text) Diff(b simple.Block) (msgs []simple.EventMessage, err error) {
text, ok := b.(*Text)
if !ok {
@ -564,7 +569,7 @@ func (t *Text) String() string {
func (t *Text) FillSmartIds(ids []string) []string {
if t.content.Marks != nil {
for _, m := range t.content.Marks.Marks {
if (m.Type == model.BlockContentTextMark_Mention ||
if (m.Type == model.BlockContentTextMark_Mention ||
m.Type == model.BlockContentTextMark_Object) && m.Param != "" {
ids = append(ids, m.Param)
}

View file

@ -2,6 +2,7 @@ package source
import (
"context"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/addr"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/pb/model"
@ -97,3 +98,7 @@ func (v *anytypeProfile) Close() (err error) {
func (v *anytypeProfile) LogHeads() map[string]string {
return nil
}
func (s *anytypeProfile) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}

View file

@ -2,6 +2,7 @@ package source
import (
"context"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/change"
"github.com/anytypeio/go-anytype-middleware/core/block/editor/state"
@ -128,3 +129,7 @@ func (v *bundledObjectType) Close() (err error) {
func (v *bundledObjectType) LogHeads() map[string]string {
return nil
}
func (s *bundledObjectType) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}

View file

@ -3,6 +3,7 @@ package source
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-middleware/pb"
"strings"
"github.com/anytypeio/go-anytype-middleware/change"
@ -108,3 +109,7 @@ func (v *bundledRelation) Close() (err error) {
func (v *bundledRelation) LogHeads() map[string]string {
return nil
}
func (s *bundledRelation) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}

View file

@ -2,6 +2,7 @@ package source
import (
"context"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/addr"
"strings"
"time"
@ -122,6 +123,10 @@ func (v *date) LogHeads() map[string]string {
return nil
}
func (s *date) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}
func TimeToId(t time.Time) string {
return addr.DatePrefix + t.Format("2006-01-02")
}

View file

@ -2,6 +2,7 @@ package source
import (
"context"
"github.com/anytypeio/go-anytype-middleware/pb"
"strings"
"time"
@ -125,3 +126,7 @@ func (v *files) Close() (err error) {
func (v *files) LogHeads() map[string]string {
return nil
}
func (s *files) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}

View file

@ -3,6 +3,7 @@ package source
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-middleware/pb"
"strings"
"github.com/anytypeio/go-anytype-middleware/change"
@ -111,3 +112,7 @@ func (v *indexedRelation) Close() (err error) {
func (v *indexedRelation) LogHeads() map[string]string {
return nil
}
func (s *indexedRelation) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}

View file

@ -40,7 +40,7 @@ type Source interface {
Type() model.SmartBlockType
Virtual() bool
LogHeads() map[string]string
GetFileKeysSnapshot() []*pb.ChangeFileKeys
ReadOnly() bool
ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error)
ReadMeta(receiver ChangeReceiver) (doc state.Doc, err error)
@ -460,6 +460,10 @@ func (s *source) applyRecords(records []core.SmartblockRecordEnvelope) error {
}
}
func (s *source) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return s.getFileHashesForSnapshot(nil)
}
func (s *source) getFileHashesForSnapshot(changeHashes []string) []*pb.ChangeFileKeys {
fileKeys := s.getFileKeysByHashes(changeHashes)
var uniqKeys = make(map[string]struct{})

View file

@ -2,6 +2,7 @@ package source
import (
"context"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/change"
"github.com/anytypeio/go-anytype-middleware/core/block/editor/state"
@ -82,3 +83,7 @@ func (s *static) Close() (err error) {
func (v *static) LogHeads() map[string]string {
return nil
}
func (s *static) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}

View file

@ -3,6 +3,7 @@ package source
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/textileio/go-threads/core/db"
"github.com/textileio/go-threads/core/thread"
threadsUtil "github.com/textileio/go-threads/util"
@ -326,3 +327,7 @@ func (v *threadDB) createState() (*state.State, error) {
return s, nil
}
func (s *threadDB) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}

View file

@ -4,6 +4,7 @@ import (
"context"
"github.com/anytypeio/go-anytype-middleware/change"
"github.com/anytypeio/go-anytype-middleware/core/block/editor/state"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/addr"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/pb/model"
@ -72,3 +73,7 @@ func (v *virtual) Close() (err error) {
func (v *virtual) LogHeads() map[string]string {
return nil
}
func (s *virtual) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return nil
}

View file

@ -1,4 +1,4 @@
// +build !gomobile,!windows
// +build !gomobile,!windows,!nographviz
package dot

View file

@ -1,4 +1,4 @@
// +build gomobile windows
// +build gomobile windows nographviz
package dot

View file

@ -7,26 +7,27 @@ import (
"github.com/anytypeio/go-anytype-middleware/pkg/lib/pb/model"
)
func NewConverter(s *state.State) converter.Converter {
func NewConverter(s state.Doc) converter.Converter {
return &pbc{s}
}
type pbc struct {
s *state.State
s state.Doc
}
func (p *pbc) Convert() (result []byte) {
st := p.s.NewState()
snapshot := &pb.ChangeSnapshot{
Data: &model.SmartBlockSnapshotBase{
Blocks: p.s.BlocksToSave(),
Details: p.s.Details(),
ExtraRelations: p.s.ExtraRelations(),
ObjectTypes: p.s.ObjectTypes(),
Collections: p.s.Store(),
Blocks: st.BlocksToSave(),
Details: st.CombinedDetails(),
ExtraRelations: st.ExtraRelations(),
ObjectTypes: st.ObjectTypes(),
Collections: st.Store(),
},
}
for _, fk := range p.s.GetFileKeys() {
snapshot.FileKeys = append(snapshot.FileKeys, &fk)
snapshot.FileKeys = append(snapshot.FileKeys, &pb.ChangeFileKeys{Hash: fk.Hash, Keys: fk.Keys})
}
result, _ = snapshot.Marshal()
return

View file

@ -8,26 +8,27 @@ import (
"github.com/gogo/protobuf/jsonpb"
)
func NewConverter(s *state.State) converter.Converter {
func NewConverter(s state.Doc) converter.Converter {
return &pbj{s}
}
type pbj struct {
s *state.State
s state.Doc
}
func (p *pbj) Convert() []byte {
st := p.s.NewState()
snapshot := &pb.ChangeSnapshot{
Data: &model.SmartBlockSnapshotBase{
Blocks: p.s.BlocksToSave(),
Details: p.s.Details(),
ExtraRelations: p.s.ExtraRelations(),
ObjectTypes: p.s.ObjectTypes(),
Collections: p.s.Store(),
Blocks: st.BlocksToSave(),
Details: st.CombinedDetails(),
ExtraRelations: st.ExtraRelations(),
ObjectTypes: st.ObjectTypes(),
Collections: st.Store(),
},
}
for _, fk := range p.s.GetFileKeys() {
snapshot.FileKeys = append(snapshot.FileKeys, &fk)
snapshot.FileKeys = append(snapshot.FileKeys, &pb.ChangeFileKeys{Hash: fk.Hash, Keys: fk.Keys})
}
m := jsonpb.Marshaler{Indent: " "}
result, _ := m.MarshalToString(snapshot)

View file

@ -40,10 +40,12 @@ func (mw *Middleware) FileListOffload(req *pb.RpcFileListOffloadRequest) *pb.Rpc
var (
totalBytesOffloaded uint64
totalFilesOffloaded int32
totalFilesSkipped int
)
ds := mw.app.MustComponent(datastore.CName).(datastore.Datastore)
for _, fileId := range files {
if st, exists := pinStatus[fileId]; (!exists || st.Status != pb2.PinStatus_Done) && !req.IncludeNotPinned {
totalFilesSkipped++
continue
}
bytesRemoved, err := at.FileOffload(fileId)
@ -62,6 +64,12 @@ func (mw *Middleware) FileListOffload(req *pb.RpcFileListOffloadRequest) *pb.Rpc
return response(0, 0, pb.RpcFileListOffloadResponseError_UNKNOWN_ERROR, err)
}
log.With("files_offloaded", totalFilesOffloaded).
With("files_offloaded_b", totalBytesOffloaded).
With("gc_freed_b", freed).
With("files_skipped", totalFilesSkipped).
Errorf("filelistoffload results")
return response(totalFilesOffloaded, uint64(freed), pb.RpcFileListOffloadResponseError_NULL, nil)
}
@ -83,6 +91,7 @@ func (mw *Middleware) FileOffload(req *pb.RpcFileOffloadRequest) *pb.RpcFileOffl
at := mw.app.MustComponent(core.CName).(core.Service)
pin := mw.app.MustComponent(pin.CName).(pin.FilePinService)
ds := mw.app.MustComponent(datastore.CName).(datastore.Datastore)
if !at.IsStarted() {
response(0, pb.RpcFileOffloadResponseError_NODE_NOT_STARTED, fmt.Errorf("anytype node not started"))
@ -104,5 +113,10 @@ func (mw *Middleware) FileOffload(req *pb.RpcFileOffloadRequest) *pb.RpcFileOffl
totalBytesOffloaded += bytesRemoved
}
return response(totalBytesOffloaded, pb.RpcFileOffloadResponseError_NULL, nil)
freed, err := ds.RunBlockstoreGC()
if err != nil {
return response(0, pb.RpcFileOffloadResponseError_UNKNOWN_ERROR, err)
}
return response(uint64(freed), pb.RpcFileOffloadResponseError_NULL, nil)
}

View file

@ -1,6 +1,7 @@
package subscription
import (
"github.com/anytypeio/go-anytype-middleware/util/slice"
"github.com/gogo/protobuf/types"
)
@ -14,7 +15,29 @@ type entry struct {
id string
data *types.Struct
refs int
subIds []string
isActive bool
}
func (e *entry) AddSubId(subId string, isActive bool) {
if slice.FindPos(e.subIds, subId) == -1 {
e.subIds = append(e.subIds, subId)
}
if isActive {
e.isActive = true
}
}
func (e *entry) IsActive() bool {
return e.isActive
}
func (e *entry) RemoveSubId(subId string) {
e.subIds = slice.Remove(e.subIds, subId)
}
func (e *entry) SubIds() []string {
return e.subIds
}
func (e *entry) Get(key string) *types.Value {
@ -25,43 +48,31 @@ type cache struct {
entries map[string]*entry
}
func (c *cache) get(id string) *entry {
if e := c.entries[id]; e != nil {
e.refs++
return e
}
return nil
}
func (c *cache) release(id string) {
if e := c.entries[id]; e != nil {
e.refs--
if e.refs == 0 {
delete(c.entries, id)
}
}
}
func (c *cache) pick(id string) *entry {
func (c *cache) Get(id string) *entry {
return c.entries[id]
}
func (c *cache) exists(id string) bool {
_, ok := c.entries[id]
return ok
func (c *cache) GetOrSet(e *entry) *entry {
if res, ok := c.entries[e.id]; ok {
return res
}
c.entries[e.id] = e
return e
}
func (c *cache) getOrSet(e *entry) *entry {
if !c.exists(e.id) {
c.set(e)
}
return c.get(e.id)
func (c *cache) Set(e *entry) {
c.entries[e.id] = e
}
func (c *cache) set(e *entry) {
if ex, ok := c.entries[e.id]; ok {
ex.data = e.data
} else {
c.entries[e.id] = e
}
func (c *cache) Remove(id string) {
delete(c.entries, id)
}
func (c *cache) RemoveSubId(id, subId string) {
if e := c.Get(id); e != nil {
e.RemoveSubId(subId)
if len(e.SubIds()) == 0 {
c.Remove(id)
}
}
}

View file

@ -6,20 +6,15 @@ import (
"github.com/stretchr/testify/assert"
)
func TestCache(t *testing.T) {
c := newCache()
entries := genEntries(3, false)
for _, e := range entries {
c.set(e)
assert.NotNil(t, c.pick(e.id))
assert.NotNil(t, c.get(e.id))
}
for _, e := range entries {
c.set(e)
assert.NotNil(t, c.pick(e.id))
}
for _, e := range entries {
c.release(e.id)
}
assert.Len(t, c.entries, 0)
func TestEntry_SubIds(t *testing.T) {
e := &entry{}
e.AddSubId("1", true)
assert.Len(t, e.SubIds(), 1)
e.AddSubId("2", false)
assert.Len(t, e.SubIds(), 2)
e.AddSubId("2", false)
assert.Len(t, e.SubIds(), 2)
assert.True(t, e.IsActive())
e.RemoveSubId("1")
assert.Len(t, e.SubIds(), 1)
}

View file

@ -35,21 +35,23 @@ type opCounter struct {
type opCtx struct {
// subIds for remove
remove []opRemove
change []opChange
add []opChange
position []opPosition
counters []opCounter
depEntries []*entry
remove []opRemove
change []opChange
add []opChange
position []opPosition
counters []opCounter
entries []*entry
keysBuf []struct {
id string
subIds []string
keys []string
}
c *cache
}
func (ctx *opCtx) apply(c *cache, entries []*entry) (events []*pb.Event) {
func (ctx *opCtx) apply() (events []*pb.Event) {
var byEventsContext = make(map[string][]*pb.EventMessage)
var appendToContext = func(contextId string, msg ...*pb.EventMessage) {
msgs, ok := byEventsContext[contextId]
@ -79,7 +81,7 @@ func (ctx *opCtx) apply(c *cache, entries []*entry) (events []*pb.Event) {
}
// details events
appendToContext("", ctx.detailsEvents(c, entries)...)
appendToContext("", ctx.detailsEvents()...)
// positions
for _, pos := range ctx.position {
@ -118,11 +120,12 @@ func (ctx *opCtx) apply(c *cache, entries []*entry) (events []*pb.Event) {
}
// apply to cache
for _, e := range entries {
c.set(e)
}
for _, e := range ctx.depEntries {
c.set(e)
for _, e := range ctx.entries {
if len(e.SubIds()) > 0 {
ctx.c.Set(e)
} else {
ctx.c.Remove(e.id)
}
}
events = make([]*pb.Event, 0, len(byEventsContext))
@ -144,14 +147,9 @@ func (ctx *opCtx) apply(c *cache, entries []*entry) (events []*pb.Event) {
return
}
func (ctx *opCtx) detailsEvents(c *cache, entries []*entry) (msgs []*pb.EventMessage) {
func (ctx *opCtx) detailsEvents() (msgs []*pb.EventMessage) {
var getEntry = func(id string) *entry {
for _, e := range entries {
if e.id == id {
return e
}
}
for _, e := range ctx.depEntries {
for _, e := range ctx.entries {
if e.id == id {
return e
}
@ -164,9 +162,9 @@ func (ctx *opCtx) detailsEvents(c *cache, entries []*entry) (msgs []*pb.EventMes
log.Errorf("entry present in changes but not in list: %v", info.id)
continue
}
prev := c.pick(info.id)
prev := ctx.c.Get(info.id)
var prevData *types.Struct
if prev != nil {
if prev != nil && prev.IsActive() {
prevData = prev.data
}
diff := pbtypes.StructDiff(prevData, curr.data)
@ -202,6 +200,16 @@ func (ctx *opCtx) collectKeys(id string, subId string, keys []string) {
}
}
func (ctx *opCtx) getEntry(id string) *entry {
for _, e := range ctx.entries {
if e.id == id {
return e
}
}
return nil
}
func (ctx *opCtx) reset() {
ctx.remove = ctx.remove[:0]
ctx.change = ctx.change[:0]
@ -209,5 +217,6 @@ func (ctx *opCtx) reset() {
ctx.position = ctx.position[:0]
ctx.counters = ctx.counters[:0]
ctx.keysBuf = ctx.keysBuf[:0]
ctx.depEntries = ctx.depEntries[:0]
ctx.entries = ctx.entries[:0]
}

View file

@ -20,21 +20,20 @@ type dependencyService struct {
isRelationObjMap map[string]bool
}
func (ds *dependencyService) makeSubscriptionByEntries(subId string, entries []*entry, keys, depKeys []string) *simpleSub {
func (ds *dependencyService) makeSubscriptionByEntries(subId string, allEntries, activeEntries []*entry, keys, depKeys []string) *simpleSub {
depSub := ds.s.newSimpleSub(subId, keys, true)
depEntries := ds.depEntriesByEntries(entries, depKeys)
depEntries := ds.depEntriesByEntries(&opCtx{entries: allEntries}, activeEntries, depKeys)
depSub.init(depEntries)
return depSub
}
func (ds *dependencyService) refillSubscription(ctx *opCtx, sub *simpleSub, entries []*entry, depKeys []string) {
depEntries := ds.depEntriesByEntries(entries, depKeys)
depEntries := ds.depEntriesByEntries(ctx, entries, depKeys)
sub.refill(ctx, depEntries)
ctx.depEntries = append(ctx.depEntries, depEntries...)
return
}
func (ds *dependencyService) depEntriesByEntries(entries []*entry, depKeys []string) (depEntries []*entry) {
func (ds *dependencyService) depEntriesByEntries(ctx *opCtx, entries []*entry, depKeys []string) (depEntries []*entry) {
var depIds []string
for _, e := range entries {
for _, k := range depKeys {
@ -54,10 +53,26 @@ func (ds *dependencyService) depEntriesByEntries(entries []*entry, depKeys []str
}
depEntries = make([]*entry, 0, len(depRecords))
for _, r := range depRecords {
depEntries = append(depEntries, &entry{
id: pbtypes.GetString(r.Details, "id"),
data: r.Details,
})
var e *entry
id := pbtypes.GetString(r.Details, "id")
// priority: ctx.entries, cache, objectStore
if e = ctx.getEntry(id); e == nil {
if e = ds.s.cache.Get(id); e == nil {
e = &entry{
id: id,
data: r.Details,
}
} else {
e = &entry{
id: id,
data: e.data,
subIds: e.subIds,
}
}
ctx.entries = append(ctx.entries, e)
}
depEntries = append(depEntries, e)
}
return
}
@ -69,6 +84,7 @@ func (ds *dependencyService) isRelationObject(key string) bool {
rel, err := ds.s.objectStore.GetRelation(key)
if err != nil {
log.Errorf("can't get relation: %v", err)
return false
}
isObj := rel.Format == model.RelationFormat_object || rel.Format == model.RelationFormat_file
ds.isRelationObjMap[key] = isObj

View file

@ -41,7 +41,7 @@ type Service interface {
type subscription interface {
init(entries []*entry) (err error)
counters() (prev, next int)
onChangeBatch(ctx *opCtx, entries ...*entry)
onChange(ctx *opCtx)
getActiveRecords() (res []*types.Struct)
close()
}
@ -66,7 +66,7 @@ func (s *service) Init(a *app.App) (err error) {
s.objectStore = a.MustComponent(objectstore.CName).(objectstore.ObjectStore)
s.recBatch = mb.New(0)
s.sendEvent = a.MustComponent(event.CName).(event.Sender).Send
s.ctxBuf = &opCtx{}
s.ctxBuf = &opCtx{c: s.cache}
return
}
@ -248,11 +248,12 @@ func (s *service) onChange(entries []*entry) {
s.m.Lock()
defer s.m.Unlock()
s.ctxBuf.reset()
s.ctxBuf.entries = entries
for _, sub := range s.subscriptions {
sub.onChangeBatch(s.ctxBuf, entries...)
sub.onChange(s.ctxBuf)
}
log.Debugf("handle %d etries; ctx: %#v", len(entries), s.ctxBuf)
events := s.ctxBuf.apply(s.cache, entries)
events := s.ctxBuf.apply()
for _, e := range events {
s.sendEvent(e)
}

View file

@ -18,11 +18,8 @@ import (
)
func TestService_Search(t *testing.T) {
t.Run("dependencies", func(t *testing.T) {
fx := newFixture(t)
defer fx.a.Close()
defer fx.ctrl.Finish()
var newSub = func(fx *fixture, subId string) {
fx.store.EXPECT().QueryRaw(gomock.Any()).Return(
[]database.Record{
{Details: &types.Struct{Fields: map[string]*types.Value{
@ -36,11 +33,11 @@ func TestService_Search(t *testing.T) {
fx.store.EXPECT().GetRelation(bundle.RelationKeyName.String()).Return(&model.Relation{
Key: bundle.RelationKeyName.String(),
Format: model.RelationFormat_shorttext,
}, nil)
}, nil).AnyTimes()
fx.store.EXPECT().GetRelation(bundle.RelationKeyAuthor.String()).Return(&model.Relation{
Key: bundle.RelationKeyAuthor.String(),
Format: model.RelationFormat_object,
}, nil)
}, nil).AnyTimes()
fx.store.EXPECT().QueryById([]string{"author1"}).Return([]database.Record{
{Details: &types.Struct{Fields: map[string]*types.Value{
@ -50,13 +47,21 @@ func TestService_Search(t *testing.T) {
}, nil)
resp, err := fx.Search(pb.RpcObjectSearchSubscribeRequest{
SubId: "test",
SubId: subId,
Keys: []string{bundle.RelationKeyName.String(), bundle.RelationKeyAuthor.String()},
})
require.NoError(t, err)
assert.Len(t, resp.Records, 1)
assert.Len(t, resp.Dependencies, 1)
}
t.Run("dependencies", func(t *testing.T) {
fx := newFixture(t)
defer fx.a.Close()
defer fx.ctrl.Finish()
newSub(fx, "test")
fx.store.EXPECT().QueryById([]string{"author2", "author3", "1"}).Return([]database.Record{
{Details: &types.Struct{Fields: map[string]*types.Value{
@ -68,8 +73,8 @@ func TestService_Search(t *testing.T) {
"name": pbtypes.String("author3"),
}}},
{Details: &types.Struct{Fields: map[string]*types.Value{
"id": pbtypes.String("1"),
"name": pbtypes.String("one"),
"id": pbtypes.String("1"),
"name": pbtypes.String("one"),
"author": pbtypes.StringList([]string{"author2", "author3", "1"}),
}}},
}, nil)
@ -82,35 +87,42 @@ func TestService_Search(t *testing.T) {
}}},
})
assert.Len(t, fx.Service.(*service).cache.entries, 3)
assert.Equal(t, 2, fx.Service.(*service).cache.entries["1"].refs)
assert.Equal(t, 1, fx.Service.(*service).cache.entries["author2"].refs)
assert.Equal(t, 1, fx.Service.(*service).cache.entries["author3"].refs)
require.Len(t, fx.Service.(*service).cache.entries, 3)
assert.Len(t, fx.Service.(*service).cache.entries["1"].SubIds(), 2)
assert.Len(t, fx.Service.(*service).cache.entries["author2"].SubIds(), 1)
assert.Len(t, fx.Service.(*service).cache.entries["author3"].SubIds(), 1)
fx.events = fx.events[:0]
fx.Service.(*service).onChange([]*entry{
{id: "1", data: &types.Struct{Fields: map[string]*types.Value{
"id": pbtypes.String("1"),
"name": pbtypes.String("one"),
"id": pbtypes.String("1"),
"name": pbtypes.String("one"),
}}},
})
/*
for _, e := range fx.events {
t.Log(pbtypes.Sprint(e))
}
for _, e := range fx.Service.(*service).cache.entries {
t.Log(e.id, e.refs)
}*/
assert.Len(t, fx.Service.(*service).cache.entries, 1)
assert.Equal(t, 1, fx.Service.(*service).cache.entries["1"].refs)
assert.NoError(t, fx.Unsubscribe("test"))
assert.Len(t, fx.Service.(*service).cache.entries, 0)
})
t.Run("cache ref counter", func(t *testing.T) {
fx := newFixture(t)
defer fx.a.Close()
defer fx.ctrl.Finish()
newSub(fx, "test")
require.Len(t, fx.Service.(*service).cache.entries, 2)
assert.Equal(t, []string{"test"}, fx.Service.(*service).cache.entries["1"].SubIds())
assert.Equal(t, []string{"test/dep"}, fx.Service.(*service).cache.entries["author1"].SubIds())
newSub(fx, "test1")
require.Len(t, fx.Service.(*service).cache.entries, 2)
assert.Len(t, fx.Service.(*service).cache.entries["1"].SubIds(), 2)
assert.Len(t, fx.Service.(*service).cache.entries["author1"].SubIds(), 2)
})
}
func newFixture(t *testing.T) *fixture {

View file

@ -33,13 +33,14 @@ type simpleSub struct {
func (s *simpleSub) init(entries []*entry) (err error) {
s.set = make(map[string]struct{})
for _, e := range entries {
e = s.cache.getOrSet(e)
e = s.cache.GetOrSet(e)
s.set[e.id] = struct{}{}
e.AddSubId(s.id, true)
}
if s.ds != nil {
s.depKeys = s.ds.depKeys(s.keys)
if len(s.depKeys) > 0 {
s.depSub = s.ds.makeSubscriptionByEntries(s.id+"/dep", s.getActiveEntries(), s.keys, s.depKeys)
s.depSub = s.ds.makeSubscriptionByEntries(s.id+"/dep", entries, s.getActiveEntries(), s.keys, s.depKeys)
}
}
return
@ -48,7 +49,6 @@ func (s *simpleSub) init(entries []*entry) (err error) {
func (s *simpleSub) refill(ctx *opCtx, entries []*entry) {
var newSet = make(map[string]struct{})
for _, e := range entries {
e = s.cache.getOrSet(e)
if _, inSet := s.set[e.id]; inSet {
ctx.change = append(ctx.change, opChange{
id: e.id,
@ -63,6 +63,7 @@ func (s *simpleSub) refill(ctx *opCtx, entries []*entry) {
})
}
newSet[e.id] = struct{}{}
e.AddSubId(s.id, true)
}
for oldId := range s.set {
if _, inSet := newSet[oldId]; !inSet {
@ -70,7 +71,7 @@ func (s *simpleSub) refill(ctx *opCtx, entries []*entry) {
id: oldId,
subId: s.id,
})
s.cache.release(oldId)
s.cache.RemoveSubId(oldId, s.id)
}
}
s.set = newSet
@ -80,9 +81,9 @@ func (s *simpleSub) counters() (prev, next int) {
return 0, 0
}
func (s *simpleSub) onChangeBatch(ctx *opCtx, entries ...*entry) {
func (s *simpleSub) onChange(ctx *opCtx) {
var changed bool
for _, e := range entries {
for _, e := range ctx.entries {
if _, inSet := s.set[e.id]; inSet {
ctx.change = append(ctx.change, opChange{
id: e.id,
@ -90,6 +91,7 @@ func (s *simpleSub) onChangeBatch(ctx *opCtx, entries ...*entry) {
keys: s.keys,
})
changed = true
e.AddSubId(s.id, true)
}
}
if changed && s.depSub != nil {
@ -100,21 +102,21 @@ func (s *simpleSub) onChangeBatch(ctx *opCtx, entries ...*entry) {
func (s *simpleSub) getActiveEntries() (res []*entry) {
s.activeEntriesBuf = s.activeEntriesBuf[:0]
for id := range s.set {
res = append(res, s.cache.pick(id))
res = append(res, s.cache.Get(id))
}
return s.activeEntriesBuf
}
func (s *simpleSub) getActiveRecords() (res []*types.Struct) {
for id := range s.set {
res = append(res, pbtypes.StructFilterKeys(s.cache.pick(id).data, s.keys))
res = append(res, pbtypes.StructFilterKeys(s.cache.Get(id).data, s.keys))
}
return
}
func (s *simpleSub) close() {
for id := range s.set {
s.cache.release(id)
s.cache.RemoveSubId(id, s.id)
}
if s.depSub != nil {
s.depSub.close()

View file

@ -9,30 +9,32 @@ import (
func TestSimpleSub_Changes(t *testing.T) {
t.Run("add to set", func(t *testing.T) {
sub := &simpleSub{
keys: []string{"id", "order"},
cache: newCache(),
keys: []string{"id", "order"},
cache: newCache(),
}
require.NoError(t, sub.init(genEntries(10, false)))
ctx := &opCtx{}
sub.onChangeBatch(ctx, genEntry("id5", 109))
ctx := &opCtx{c: sub.cache}
ctx.entries = append(ctx.entries, genEntry("id5", 109))
sub.onChange(ctx)
assertCtxChange(t, ctx, "id5")
})
t.Run("miss set", func(t *testing.T) {
sub := &simpleSub{
keys: []string{"id", "order"},
cache: newCache(),
keys: []string{"id", "order"},
cache: newCache(),
}
require.NoError(t, sub.init(genEntries(10, false)))
ctx := &opCtx{}
sub.onChangeBatch(ctx, genEntry("id50", 100))
ctx := &opCtx{c: sub.cache}
ctx.entries = append(ctx.entries, genEntry("id50", 100))
sub.onChange(ctx)
assertCtxEmpty(t, ctx)
})
}
func TestSimpleSub_Refill(t *testing.T) {
sub := &simpleSub{
keys: []string{"id", "order"},
cache: newCache(),
keys: []string{"id", "order"},
cache: newCache(),
}
require.NoError(t, sub.init(genEntries(3, false)))
ctx := &opCtx{}
@ -40,4 +42,4 @@ func TestSimpleSub_Refill(t *testing.T) {
assertCtxChange(t, ctx, "id3")
assertCtxRemove(t, ctx, "id1", "id2")
assertCtxAdd(t, ctx, "id20", "")
}
}

View file

@ -10,8 +10,8 @@ import (
)
var (
ErrAfterId = errors.New("after id not in set")
ErrBeforeId = errors.New("before id not in set")
ErrAfterId = errors.New("after id not in set")
ErrBeforeId = errors.New("before id not in set")
ErrNoRecords = errors.New("no records with given offset")
)
@ -52,22 +52,40 @@ type sortedSub struct {
func (s *sortedSub) init(entries []*entry) (err error) {
s.skl = skiplist.New(s)
for _, e := range entries {
e = s.cache.getOrSet(e)
defer func() {
if err != nil {
s.close()
}
}()
for i, e := range entries {
e = s.cache.GetOrSet(e)
entries[i] = e
e.AddSubId(s.id, true)
s.skl.Set(e, nil)
}
if s.afterId != "" {
e := s.cache.pick(s.afterId)
e := s.cache.Get(s.afterId)
if e == nil {
return ErrAfterId
err = ErrAfterId
return
}
s.afterEl = s.skl.Get(e)
if s.afterEl == nil {
err = ErrAfterId
return
}
} else if s.beforeId != "" {
e := s.cache.pick(s.beforeId)
e := s.cache.Get(s.beforeId)
if e == nil {
return ErrBeforeId
err = ErrBeforeId
return
}
s.beforeEl = s.skl.Get(e)
if s.beforeEl == nil {
err = ErrBeforeId
return
}
} else if s.offset > 0 {
el := s.skl.Front()
i := 0
@ -76,29 +94,34 @@ func (s *sortedSub) init(entries []*entry) (err error) {
if i == s.offset {
s.afterId = el.Key().(*entry).id
s.afterEl = el
break
}
el = el.Next()
}
if s.afterEl == nil {
return ErrNoRecords
err = ErrNoRecords
return
}
}
activeEntries := s.getActiveEntries()
for _, ae := range activeEntries {
ae.AddSubId(s.id, true)
}
if s.ds != nil {
s.depKeys = s.ds.depKeys(s.keys)
if len(s.depKeys) > 0 {
s.depSub = s.ds.makeSubscriptionByEntries(s.id+"/dep", s.getActiveEntries(), s.keys, s.depKeys)
s.depSub = s.ds.makeSubscriptionByEntries(s.id+"/dep", entries, activeEntries, s.keys, s.depKeys)
}
}
return nil
}
func (s *sortedSub) onChangeBatch(ctx *opCtx, entries ...*entry) {
func (s *sortedSub) onChange(ctx *opCtx) {
var countersChanged, activeChanged bool
for _, e := range entries {
ch, ac := s.onChange(ctx, e)
for _, e := range ctx.entries {
ch, ac := s.onEntryChange(ctx, e)
if ch {
countersChanged = true
}
@ -120,12 +143,12 @@ func (s *sortedSub) onChangeBatch(ctx *opCtx, entries ...*entry) {
}
}
func (s *sortedSub) onChange(ctx *opCtx, e *entry) (countersChanged, activeChanged bool) {
func (s *sortedSub) onEntryChange(ctx *opCtx, e *entry) (countersChanged, activeChanged bool) {
newInSet := true
if s.filter != nil {
newInSet = s.filter.FilterObject(e)
}
curInSet, currInActive := s.lookup(s.cache.pick(e.id))
curInSet, currInActive := s.lookup(s.cache.Get(e.id))
if !curInSet && !newInSet {
return false, false
}
@ -136,7 +159,8 @@ func (s *sortedSub) onChange(ctx *opCtx, e *entry) (countersChanged, activeChang
} else {
s.removeNonActive(e.id)
}
return true, activeChanged
countersChanged = true
return
}
if !curInSet && newInSet {
return s.add(ctx, e)
@ -148,7 +172,7 @@ func (s *sortedSub) onChange(ctx *opCtx, e *entry) (countersChanged, activeChang
}
func (s *sortedSub) removeNonActive(id string) {
e := s.cache.pick(id)
e := s.cache.Get(id)
if s.afterEl != nil {
if comp := s.Compare(s.afterEl.Key(), s.skl.Get(e).Key()); comp <= 0 {
if comp == 0 {
@ -169,12 +193,10 @@ func (s *sortedSub) removeNonActive(id string) {
}
}
s.skl.Remove(e)
s.cache.release(e.id)
}
func (s *sortedSub) removeActive(ctx *opCtx, e *entry) {
s.skl.Remove(s.cache.pick(e.id))
s.cache.release(e.id)
s.skl.Remove(s.cache.Get(e.id))
ctx.remove = append(ctx.remove, opRemove{
id: e.id,
subId: s.id,
@ -184,7 +206,6 @@ func (s *sortedSub) removeActive(ctx *opCtx, e *entry) {
func (s *sortedSub) add(ctx *opCtx, e *entry) (countersChanged, activeChanged bool) {
s.skl.Set(e, nil)
s.cache.get(e.id)
_, inActive := s.lookup(e)
if inActive {
var afterId string
@ -198,19 +219,21 @@ func (s *sortedSub) add(ctx *opCtx, e *entry) (countersChanged, activeChanged bo
afterId: afterId,
})
s.alignRemove(ctx)
return false, true
e.AddSubId(s.id, true)
return true, true
}
e.AddSubId(s.id, false)
return true, false
}
func (s *sortedSub) change(ctx *opCtx, e *entry, currInActive bool) (countersChanged, activeChanged bool) {
var currAfterId string
if currInActive {
if prev := s.skl.Get(s.cache.pick(e.id)).Prev(); prev != nil {
if prev := s.skl.Get(s.cache.Get(e.id)).Prev(); prev != nil {
currAfterId = prev.Key().(*entry).id
}
}
s.skl.Remove(s.cache.pick(e.id))
s.skl.Remove(s.cache.Get(e.id))
s.skl.Set(e, nil)
_, newInActive := s.lookup(e)
if newInActive {
@ -235,6 +258,7 @@ func (s *sortedSub) change(ctx *opCtx, e *entry, currInActive bool) (countersCha
subId: s.id,
keys: s.keys,
})
e.AddSubId(s.id, true)
} else {
if currInActive {
ctx.remove = append(ctx.remove, opRemove{
@ -245,6 +269,7 @@ func (s *sortedSub) change(ctx *opCtx, e *entry, currInActive bool) (countersCha
activeChanged = true
}
countersChanged = true
e.AddSubId(s.id, false)
}
return
}
@ -524,7 +549,7 @@ func (s *sortedSub) CalcScore(key interface{}) float64 {
func (s *sortedSub) close() {
el := s.skl.Front()
for el != nil {
s.cache.release(el.Key().(*entry).id)
s.cache.RemoveSubId(el.Key().(*entry).id, s.id)
el = el.Next()
}
if s.depSub != nil {

View file

@ -20,6 +20,7 @@ func TestSubscription_Internal(t *testing.T) {
afterId: "id101",
}
require.Equal(t, ErrAfterId, sub.init(genEntries(100, false)))
assert.Len(t, sub.cache.entries, 0)
})
t.Run("beforeId err", func(t *testing.T) {
sub := &sortedSub{
@ -28,6 +29,7 @@ func TestSubscription_Internal(t *testing.T) {
beforeId: "id101",
}
require.Equal(t, ErrBeforeId, sub.init(genEntries(100, false)))
assert.Len(t, sub.cache.entries, 0)
})
})
t.Run("lookup", func(t *testing.T) {
@ -37,9 +39,14 @@ func TestSubscription_Internal(t *testing.T) {
cache: newCache(),
}
require.NoError(t, sub.init(genEntries(100, false)))
inSet, inActive := sub.lookup(sub.cache.pick("id50"))
inSet, inActive := sub.lookup(sub.cache.Get("id50"))
assert.True(t, inSet, "inSet")
assert.True(t, inActive, "inActive")
assert.Len(t, sub.cache.entries, 100)
for _, e := range sub.cache.entries {
assert.Len(t, e.SubIds(), 1)
}
})
t.Run("with limit", func(t *testing.T) {
sub := &sortedSub{
@ -48,10 +55,10 @@ func TestSubscription_Internal(t *testing.T) {
limit: 10,
}
require.NoError(t, sub.init(genEntries(100, false)))
inSet, inActive := sub.lookup(sub.cache.pick("id11"))
inSet, inActive := sub.lookup(sub.cache.Get("id11"))
assert.True(t, inSet, "inSet")
assert.False(t, inActive, "inActive")
inSet, inActive = sub.lookup(sub.cache.pick("id10"))
inSet, inActive = sub.lookup(sub.cache.Get("id10"))
assert.True(t, inSet, "inSet")
assert.True(t, inActive, "inActive")
})
@ -63,13 +70,13 @@ func TestSubscription_Internal(t *testing.T) {
}
require.NoError(t, sub.init(genEntries(100, false)))
inSet, inActive := sub.lookup(sub.cache.pick("id49"))
inSet, inActive := sub.lookup(sub.cache.Get("id49"))
assert.True(t, inSet, "inSet")
assert.False(t, inActive, "inActive")
inSet, inActive = sub.lookup(sub.cache.pick("id50"))
inSet, inActive = sub.lookup(sub.cache.Get("id50"))
assert.True(t, inSet, "inSet")
assert.False(t, inActive, "inActive")
inSet, inActive = sub.lookup(sub.cache.pick("id51"))
inSet, inActive = sub.lookup(sub.cache.Get("id51"))
assert.True(t, inSet, "inSet")
assert.True(t, inActive, "inActive")
})
@ -80,15 +87,15 @@ func TestSubscription_Internal(t *testing.T) {
beforeId: "id50",
}
require.NoError(t, sub.init(genEntries(100, false)))
inSet, inActive := sub.lookup(sub.cache.pick("id51"))
inSet, inActive := sub.lookup(sub.cache.Get("id51"))
assert.True(t, inSet, "inSet")
assert.False(t, inActive, "inActive")
inSet, inActive = sub.lookup(sub.cache.pick("id50"))
inSet, inActive = sub.lookup(sub.cache.Get("id50"))
assert.True(t, inSet, "inSet")
assert.False(t, inActive, "inActive")
inSet, inActive = sub.lookup(sub.cache.pick("id49"))
inSet, inActive = sub.lookup(sub.cache.Get("id49"))
assert.True(t, inSet, "inSet")
assert.True(t, inActive, "inActive")
})
@ -101,15 +108,15 @@ func TestSubscription_Internal(t *testing.T) {
}
require.NoError(t, sub.init(genEntries(100, false)))
inSet, inActive := sub.lookup(sub.cache.pick("id49"))
inSet, inActive := sub.lookup(sub.cache.Get("id49"))
assert.True(t, inSet, "inSet")
assert.False(t, inActive, "inActive")
inSet, inActive = sub.lookup(sub.cache.pick("id60"))
inSet, inActive = sub.lookup(sub.cache.Get("id60"))
assert.True(t, inSet, "inSet")
assert.True(t, inActive, "inActive")
inSet, inActive = sub.lookup(sub.cache.pick("id61"))
inSet, inActive = sub.lookup(sub.cache.Get("id61"))
assert.True(t, inSet, "inSet")
assert.False(t, inActive, "inActive")
})
@ -122,15 +129,15 @@ func TestSubscription_Internal(t *testing.T) {
}
require.NoError(t, sub.init(genEntries(100, false)))
inSet, inActive := sub.lookup(sub.cache.pick("id51"))
inSet, inActive := sub.lookup(sub.cache.Get("id51"))
assert.True(t, inSet, "inSet")
assert.False(t, inActive, "inActive")
inSet, inActive = sub.lookup(sub.cache.pick("id40"))
inSet, inActive = sub.lookup(sub.cache.Get("id40"))
assert.True(t, inSet, "inSet")
assert.True(t, inActive, "inActive")
inSet, inActive = sub.lookup(sub.cache.pick("id39"))
inSet, inActive = sub.lookup(sub.cache.Get("id39"))
assert.True(t, inSet, "inSet")
assert.False(t, inActive, "inActive")
})
@ -244,6 +251,7 @@ func TestSubscription_Internal(t *testing.T) {
func TestSubscription_Add(t *testing.T) {
t.Run("add", func(t *testing.T) {
sub := &sortedSub{
id: "test",
order: testOrder,
cache: newCache(),
limit: 3,
@ -258,13 +266,20 @@ func TestSubscription_Add(t *testing.T) {
genEntry("afterId2", 10),
}
ctx := &opCtx{}
sub.onChangeBatch(ctx, newEntries...)
assert.Len(t, sub.cache.entries, 9)
ctx := &opCtx{c: sub.cache, entries: newEntries}
sub.onChange(ctx)
assertCtxAdd(t, ctx, "newActiveId1", "id3")
assertCtxAdd(t, ctx, "newActiveId2", "newActiveId1")
assertCtxRemove(t, ctx, "id5", "id6")
assertCtxCounters(t, ctx, opCounter{total: 14, prevCount: 4, nextCount: 7})
t.Logf("%#v", ctx)
assertCtxCounters(t, ctx, opCounter{subId: "test", total: 14, prevCount: 4, nextCount: 7})
ctx.apply()
assert.Len(t, sub.cache.entries, 9+len(newEntries))
for _, e := range sub.cache.entries {
assert.Equal(t, []string{"test"}, e.SubIds())
}
})
}
@ -285,11 +300,12 @@ func TestSubscription_Remove(t *testing.T) {
t.Run("remove active", func(t *testing.T) {
sub := newSub()
require.NoError(t, sub.init(genEntries(9, false)))
ctx := &opCtx{}
sub.onChangeBatch(ctx, &entry{
ctx := &opCtx{c: sub.cache}
ctx.entries = append(ctx.entries, &entry{
id: "id4",
data: &types.Struct{Fields: map[string]*types.Value{"id": pbtypes.String("id4"), "order": pbtypes.Int64(100)}},
})
sub.onChange(ctx)
assertCtxRemove(t, ctx, "id4")
assertCtxCounters(t, ctx, opCounter{total: 8, prevCount: 3, nextCount: 2})
assertCtxAdd(t, ctx, "id7", "id6")
@ -297,11 +313,12 @@ func TestSubscription_Remove(t *testing.T) {
t.Run("remove non active", func(t *testing.T) {
sub := newSub()
require.NoError(t, sub.init(genEntries(9, false)))
ctx := &opCtx{}
sub.onChangeBatch(ctx, &entry{
ctx := &opCtx{c: sub.cache}
ctx.entries = append(ctx.entries, &entry{
id: "id1",
data: &types.Struct{Fields: map[string]*types.Value{"id": pbtypes.String("id4"), "order": pbtypes.Int64(100)}},
})
sub.onChange(ctx)
assertCtxCounters(t, ctx, opCounter{total: 8, prevCount: 2, nextCount: 3})
})
}
@ -315,11 +332,12 @@ func TestSubscription_Change(t *testing.T) {
afterId: "id3",
}
require.NoError(t, sub.init(genEntries(9, false)))
ctx := &opCtx{}
sub.onChangeBatch(ctx, &entry{
ctx := &opCtx{c: sub.cache}
ctx.entries = append(ctx.entries, &entry{
id: "id4",
data: &types.Struct{Fields: map[string]*types.Value{"id": pbtypes.String("id4"), "order": pbtypes.Int64(6)}},
})
sub.onChange(ctx)
assertCtxPosition(t, ctx, "id4", "id5")
assertCtxChange(t, ctx, "id4")
})

View file

@ -7127,6 +7127,7 @@ commands acceptable only for text blocks, others will be ignored
| format | [Rpc.Export.Format](#anytype.Rpc.Export.Format) | | export format |
| zip | [bool](#bool) | | save as zip file |
| includeNested | [bool](#bool) | | include all nested |
| includeFiles | [bool](#bool) | | include all files |

1
go.sum
View file

@ -592,6 +592,7 @@ github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hudl/fargo v1.3.0 h1:0U6+BtN6LhaYuTnIJq4Wyq5cpn6O2kWrxAtcqBmYY6w=
github.com/huandu/go-assert v1.1.5 h1:fjemmA7sSfYHJD7CUqs9qTwwfdNAx7/j2/ZlHXzNB3c=
github.com/huandu/go-assert v1.1.5/go.mod h1:yOLvuqZwmcHIC5rIzrBhT7D3Q9c3GFnd0JrPVhn/06U=
github.com/huandu/skiplist v1.2.0 h1:gox56QD77HzSC0w+Ws3MH3iie755GBJU1OER3h5VsYw=

View file

@ -226,6 +226,23 @@ func (c StateApply) ToEvent() Event {
}
}
type AppStart struct {
Type string
TotalMs int64
PerCompMs map[string]int64
}
func (c AppStart) ToEvent() Event {
return Event{
EventType: "app_start",
EventData: map[string]interface{}{
"type": c.Type,
"time_ms": c.TotalMs,
"per_comp": c.PerCompMs,
},
}
}
type InitPredefinedBlocks struct {
TimeMs int64
}

File diff suppressed because it is too large Load diff

View file

@ -3745,6 +3745,8 @@ message Rpc {
bool zip = 4;
// include all nested
bool includeNested = 5;
// include all files
bool includeFiles = 6;
}
message Response {

View file

@ -63,7 +63,7 @@ type Service interface {
GetBlock(blockId string) (SmartBlock, error)
GetBlockCtx(ctx context.Context, blockId string) (SmartBlock, error)
// FileOffload removes file blocks ercursively, but leave details
// FileOffload removes file blocks recursively, but leave details
FileOffload(id string) (bytesRemoved uint64, err error)
FileByHash(ctx context.Context, hash string) (File, error)

View file

@ -103,12 +103,7 @@ func (a *Anytype) FileOffload(hash string) (totalSize uint64, err error) {
}
}
freed, err := a.ds.RunBlockstoreGC()
if err != nil {
return 0, err
}
return uint64(freed), nil
return uint64(totalSize), nil
}
func (a *Anytype) FileByHash(ctx context.Context, hash string) (File, error) {
@ -121,7 +116,7 @@ func (a *Anytype) FileByHash(ctx context.Context, hash string) (File, error) {
// info from ipfs
fileList, err = a.files.FileIndexInfo(ctx, hash, false)
if err != nil {
log.Errorf("FileByHash: failed to retrieve from IPFS: %s", err.Error())
log.With("cid", hash).Errorf("FileByHash: failed to retrieve from IPFS: %s", err.Error())
return nil, ErrFileNotFound
}
}

View file

@ -12,8 +12,9 @@ import (
const payloadVersion = 1
type SmartblockLog struct {
ID string
Head string
ID string
Head string
HeadCounter int64
}
type SmartblockRecordEnvelope struct {

View file

@ -354,7 +354,7 @@ func (block *smartBlock) GetLogs() ([]SmartblockLog, error) {
return nil, err
}
var logs []SmartblockLog
var logs = make([]SmartblockLog, 0, len(thrd.Logs))
for _, l := range thrd.Logs {
var head string
if l.Head.ID.Defined() {
@ -362,8 +362,9 @@ func (block *smartBlock) GetLogs() ([]SmartblockLog, error) {
}
logs = append(logs, SmartblockLog{
ID: l.ID.String(),
Head: head,
ID: l.ID.String(),
Head: head,
HeadCounter: l.Head.Counter,
})
}

View file

@ -201,7 +201,7 @@ func (r *clientds) RunBlockstoreGC() (freed int64, err error) {
ext := filepath.Ext(info.Name())
switch ext {
case ".vlog":
index, err := strconv.ParseInt(info.Name(), 10, 64)
index, err := strconv.ParseInt(strings.TrimSuffix(info.Name(), ext), 10, 64)
if err != nil {
return nil
}
@ -265,10 +265,14 @@ func (r *clientds) RunBlockstoreGC() (freed int64, err error) {
keysTotalSize += int64(result.Size)
}
if keysTotalSize > totalSizeAfter /* + DefaultConfig.Litestore.ValueLogFileSize*/ {
log.With("vlogs_count", len(vlogsAfter)).With("keys_size_b", keysTotalSize).With("vlog_overhead_b", totalSizeAfter-keysTotalSize).With("vlogs", vlogsAfter).Errorf("Badger GC: got badger value logs overhead after GC")
freed = totalSizeBefore - totalSizeAfter
if totalSizeAfter > keysTotalSize {
log.With("vlogs_count", len(vlogsAfter)).With("vlogs_freed_b", freed).With("keys_size_b", keysTotalSize).With("vlog_overhead_b", totalSizeAfter-keysTotalSize).With("vlogs", vlogsAfter).Errorf("Badger GC: got badger value logs overhead after GC")
}
return totalSizeAfter - totalSizeBefore, nil
if freed < 0 {
freed = 0
}
return freed, nil
}
func (r *clientds) PeerstoreDS() (ds.Batching, error) {

View file

@ -322,7 +322,7 @@ type Service interface {
CafePeer() ma.Multiaddr
CreateThread(blockType smartblock.SmartBlockType) (thread.Info, error)
CreateThread(id thread.ID) (thread.Info, error)
AddThread(threadId string, key string, addrs []string) error
DeleteThread(id string) error
UpdateSimultaneousRequests(requests int) error
@ -472,11 +472,7 @@ func (s *service) GetThreadInfo(id thread.ID) (thread.Info, error) {
return ti, nil
}
func (s *service) CreateThread(blockType smartblock.SmartBlockType) (thread.Info, error) {
thrdId, err := ThreadCreateID(thread.AccessControlled, blockType)
if err != nil {
return thread.Info{}, err
}
func (s *service) CreateThread(thrdId thread.ID) (thread.Info, error) {
followKey, err := symmetric.NewRandom()
if err != nil {
return thread.Info{}, err

View file

@ -2,7 +2,6 @@ package threads
import (
"context"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core/smartblock"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/logging"
"github.com/textileio/go-threads/core/logstore"
"github.com/textileio/go-threads/core/thread"
@ -29,7 +28,7 @@ type ThreadQueue interface {
Run()
ProcessThreadsAsync(threadsFromState []ThreadInfo, workspaceId string)
AddThreadSync(info ThreadInfo, workspaceId string) error
CreateThreadSync(blockType smartblock.SmartBlockType, workspaceId string) (thread.Info, error)
CreateThreadSync(id thread.ID, workspaceId string) (thread.Info, error)
DeleteThreadSync(id, workspaceId string) error
GetWorkspacesForThread(threadId string) []string
GetThreadsForWorkspace(workspaceId string) []string
@ -129,8 +128,8 @@ func (p *threadQueue) AddThreadSync(info ThreadInfo, workspaceId string) error {
return err
}
func (p *threadQueue) CreateThreadSync(blockType smartblock.SmartBlockType, workspaceId string) (thread.Info, error) {
info, err := p.threadsService.CreateThread(blockType)
func (p *threadQueue) CreateThreadSync(threadId thread.ID, workspaceId string) (thread.Info, error) {
info, err := p.threadsService.CreateThread(threadId)
if err != nil {
return thread.Info{}, err
}
@ -332,7 +331,7 @@ func (o threadAddOperation) Run() (err error) {
func (o threadAddOperation) OnFinish(err error) {
// at the time of this function call the operation is still pending
defer o.queue.logOperation(o, err == nil, o.WorkspaceId, o.queue.l.PendingOperations() - 1)
defer o.queue.logOperation(o, err == nil, o.WorkspaceId, o.queue.l.PendingOperations()-1)
if err == nil {
o.queue.finishAddOperation(o.ID, o.WorkspaceId)
return

View file

@ -0,0 +1,262 @@
package builtinobjects
import (
"archive/zip"
"bytes"
"context"
_ "embed"
"fmt"
"github.com/anytypeio/go-anytype-middleware/core/anytype/config"
"github.com/anytypeio/go-anytype-middleware/core/block"
"github.com/anytypeio/go-anytype-middleware/core/block/simple/link"
"github.com/anytypeio/go-anytype-middleware/core/block/simple/text"
"github.com/gogo/protobuf/types"
"github.com/textileio/go-threads/core/thread"
"io"
"io/ioutil"
"path/filepath"
"strings"
"sync"
"github.com/anytypeio/go-anytype-middleware/app"
"github.com/anytypeio/go-anytype-middleware/core/block/editor/state"
"github.com/anytypeio/go-anytype-middleware/core/block/simple"
"github.com/anytypeio/go-anytype-middleware/core/block/simple/relation"
"github.com/anytypeio/go-anytype-middleware/core/block/source"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/bundle"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core/smartblock"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/addr"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/logging"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/pb/model"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/threads"
"github.com/anytypeio/go-anytype-middleware/util/pbtypes"
)
const CName = "builtinobjects"
//go:embed data/bundled_objects.zip
var objectsZip []byte
var log = logging.Logger("anytype-mw-builtinobjects")
const (
analyticsContext = "get-started"
)
func New() BuiltinObjects {
return new(builtinObjects)
}
type BuiltinObjects interface {
app.ComponentRunnable
}
type builtinObjects struct {
cancel func()
l sync.Mutex
source source.Service
service block.Service
newAccount bool
idsMap map[string]string
}
func (b *builtinObjects) Init(a *app.App) (err error) {
b.source = a.MustComponent(source.CName).(source.Service)
b.service = a.MustComponent(block.CName).(block.Service)
b.newAccount = a.MustComponent(config.CName).(*config.Config).NewAccount
b.cancel = func() {}
return
}
func (b *builtinObjects) Name() (name string) {
return CName
}
func (b *builtinObjects) Run() (err error) {
if !b.newAccount {
// import only for new accounts
return
}
var ctx context.Context
ctx, b.cancel = context.WithCancel(context.Background())
go func() {
err = b.inject(ctx)
if err != nil {
log.Errorf("failed to import builtinObjects: %s", err.Error())
}
}()
return
}
func (b *builtinObjects) inject(ctx context.Context) (err error) {
zr, err := zip.NewReader(bytes.NewReader(objectsZip), int64(len(objectsZip)))
if err != nil {
return
}
b.idsMap = make(map[string]string, len(zr.File))
for _, zf := range zr.File {
id := strings.TrimSuffix(zf.Name, filepath.Ext(zf.Name))
sbt, err := smartblock.SmartBlockTypeFromID(id)
if err != nil {
return err
}
tid, err := threads.ThreadCreateID(thread.AccessControlled, sbt)
if err != nil {
return err
}
b.idsMap[id] = tid.String()
}
for _, zf := range zr.File {
rd, e := zf.Open()
if e != nil {
return e
}
if err = b.createObject(ctx, rd); err != nil {
return
}
}
return nil
}
func (b *builtinObjects) createObject(ctx context.Context, rd io.ReadCloser) (err error) {
defer rd.Close()
data, err := ioutil.ReadAll(rd)
snapshot := &pb.ChangeSnapshot{}
if err = snapshot.Unmarshal(data); err != nil {
return
}
isFavorite := pbtypes.GetBool(snapshot.Data.Details, bundle.RelationKeyIsFavorite.String())
isArchived := pbtypes.GetBool(snapshot.Data.Details, bundle.RelationKeyIsArchived.String())
if isArchived {
return fmt.Errorf("object has isarchived == true")
}
st := state.NewDocFromSnapshot("", snapshot).(*state.State)
oldId := st.RootId()
newId, exists := b.idsMap[oldId]
if !exists {
return fmt.Errorf("new id not found for '%s'", st.RootId())
}
st.SetRootId(newId)
a := st.Get(newId)
m := a.Model()
f := m.GetFields().GetFields()
if f == nil {
f = make(map[string]*types.Value)
}
m.Fields = &types.Struct{Fields: f}
f["analyticsContext"] = pbtypes.String(analyticsContext)
f["analyticsOriginalId"] = pbtypes.String(oldId)
st.Set(simple.New(m))
st.RemoveDetail(bundle.RelationKeyCreator.String(), bundle.RelationKeyLastModifiedBy.String())
st.SetLocalDetail(bundle.RelationKeyCreator.String(), pbtypes.String(addr.AnytypeProfileId))
st.SetLocalDetail(bundle.RelationKeyLastModifiedBy.String(), pbtypes.String(addr.AnytypeProfileId))
st.InjectDerivedDetails()
if err = b.validate(st); err != nil {
return
}
st.Iterate(func(bl simple.Block) (isContinue bool) {
switch a := bl.(type) {
case link.Block:
newTarget := b.idsMap[a.Model().GetLink().TargetBlockId]
if newTarget == "" {
// maybe we should panic here?
log.Errorf("cant find target id for link: %s", a.Model().GetLink().TargetBlockId)
return true
}
a.Model().GetLink().TargetBlockId = newTarget
st.Set(simple.New(a.Model()))
case text.Block:
for i, mark := range a.Model().GetText().GetMarks().GetMarks() {
if mark.Type != model.BlockContentTextMark_Mention && mark.Type != model.BlockContentTextMark_Object {
continue
}
newTarget := b.idsMap[mark.Param]
if newTarget == "" {
log.Errorf("cant find target id for mentrion: %s", mark.Param)
continue
}
a.Model().GetText().GetMarks().GetMarks()[i].Param = newTarget
}
st.Set(simple.New(a.Model()))
}
return true
})
for k, v := range st.Details().GetFields() {
rel, err := bundle.GetRelation(bundle.RelationKey(k))
if err != nil {
log.Errorf("failed to find relation %s: %s", k, err.Error())
continue
}
if rel.Format != model.RelationFormat_object {
continue
}
vals := pbtypes.GetStringListValue(v)
for i, val := range vals {
newTarget, _ := b.idsMap[val]
if newTarget == "" {
log.Errorf("cant find target id for relation %s: %s", k, val)
continue
}
vals[i] = newTarget
}
}
sbt, err := smartblock.SmartBlockTypeFromID(newId)
if err != nil {
return err
}
_, _, err = b.service.CreateSmartBlockFromState(ctx, sbt, nil, nil, st)
if isFavorite {
err = b.service.SetPageIsFavorite(pb.RpcObjectSetIsFavoriteRequest{ContextId: newId, IsFavorite: true})
if err != nil {
log.Errorf("failed to set isFavorite when importing object %s(originally %s): %s", newId, oldId, err.Error())
}
}
return err
}
func (b *builtinObjects) validate(st *state.State) (err error) {
var relKeys []string
for _, rel := range st.ExtraRelations() {
if !bundle.HasRelation(rel.Key) {
// todo: temporarily, make this as error
log.Errorf("builtin objects should not contain custom relations, got %s in %s(%s)", rel.Name, st.RootId(), pbtypes.GetString(st.Details(), bundle.RelationKeyName.String()))
//return fmt.Errorf("builtin objects should not contain custom relations, got %s in %s(%s)", rel.Name, st.RootId(), pbtypes.GetString(st.Details(), bundle.RelationKeyName.String()))
}
}
st.Iterate(func(b simple.Block) (isContinue bool) {
if rb, ok := b.(relation.Block); ok {
relKeys = append(relKeys, rb.Model().GetRelation().Key)
}
return true
})
for _, rk := range relKeys {
if !st.HasRelation(rk) {
return fmt.Errorf("bundled template validation: relation '%v' exists in block but not in extra relations", rk)
}
}
return nil
}
func (b *builtinObjects) Close() (err error) {
if b.cancel != nil {
b.cancel()
}
return
}

Binary file not shown.