From 400f6fca4ffe82b4c3a889c6e2f3b3385d9b4d4e Mon Sep 17 00:00:00 2001 From: Roman Khafizianov Date: Tue, 12 Jul 2022 21:25:48 +0200 Subject: [PATCH] add metrics & pass ctx through app cmp and tree build --- app/app.go | 23 +++++- app/app_test.go | 7 +- change/builder.go | 79 +++++++++---------- change/builder_test.go | 33 ++++---- change/change_test.go | 5 +- change/graphviz_nix.go | 3 +- change/state_test.go | 5 +- change/tree.go | 5 +- change/tree_test.go | 13 +-- cmd/cli/cafe.go | 2 +- cmd/cli/debug.go | 5 +- cmd/debugtree/debugtree.go | 5 +- core/account.go | 42 +++------- core/anytype/bootstrap.go | 9 ++- core/block/doc/service.go | 2 +- core/block/doc/service_test.go | 2 +- core/block/editor/files.go | 2 +- core/block/editor/smartblock/smartblock.go | 7 +- .../editor/smartblock/smartblock_test.go | 3 +- core/block/process/service.go | 3 +- core/block/process/service_test.go | 3 +- core/block/service.go | 41 +++++----- core/block/source/anytypeprofile.go | 4 +- core/block/source/bundledobjecttype.go | 4 +- core/block/source/bundledrelation.go | 4 +- core/block/source/date.go | 4 +- core/block/source/files.go | 4 +- core/block/source/indexedrelation.go | 4 +- core/block/source/source.go | 58 +++++++++++--- core/block/source/static.go | 4 +- core/block/source/threaddb.go | 4 +- core/block/source/virtual.go | 4 +- core/configfetcher/configfetcher.go | 2 +- core/debug/debugtree/debugtree.go | 2 +- core/history/history.go | 5 +- core/history/history_test.go | 3 +- core/indexer/indexer.go | 16 ++-- core/indexer/indexer_test.go | 5 +- core/status/service.go | 3 +- core/subscription/service.go | 3 +- core/subscription/service_test.go | 3 +- metrics/client.go | 3 + metrics/events.go | 24 ++++-- pkg/lib/core/context.go | 52 ++++++++++++ pkg/lib/core/core.go | 4 +- pkg/lib/core/smartblock.go | 23 ++++++ pkg/lib/datastore/clientds/clientds.go | 2 +- pkg/lib/gateway/gateway.go | 2 +- pkg/lib/ipfs/ipfslite/lite.go | 2 +- pkg/lib/localstore/filestore/files.go | 3 +- pkg/lib/localstore/ftsearch/ftsearch.go | 3 +- pkg/lib/localstore/ftsearch/ftsearch_test.go | 3 +- pkg/lib/localstore/objectstore/objects.go | 3 +- .../localstore/objectstore/objects_test.go | 29 +++---- pkg/lib/pin/service.go | 2 +- pkg/lib/threads/service.go | 2 +- util/builtinobjects/builtinobjects.go | 2 +- util/builtintemplate/builtintemplate.go | 3 +- util/builtintemplate/builtintemplate_test.go | 3 +- util/testMock/anytype.go | 5 +- .../mockBuiltinTemplate/builtintemplate.go | 3 +- util/testMock/mockStatus/status.go | 3 +- 62 files changed, 387 insertions(+), 224 deletions(-) create mode 100644 pkg/lib/core/context.go diff --git a/app/app.go b/app/app.go index b178c78e6..fc17f4879 100644 --- a/app/app.go +++ b/app/app.go @@ -1,8 +1,10 @@ package app import ( + "context" "errors" "fmt" + "github.com/anytypeio/go-anytype-middleware/metrics" "os" "runtime" "strings" @@ -34,7 +36,7 @@ type ComponentRunnable interface { Component // Run will be called after init stage // Non-nil error also will be aborted app start - Run() (err error) + Run(ctx context.Context) (err error) // Close will be called when app shutting down // Also will be called when service return error on Init or Run stage // Non-nil error will be printed to log @@ -138,7 +140,7 @@ func (app *App) ComponentNames() (names []string) { // Start starts the application // All registered services will be initialized and started -func (app *App) Start() (err error) { +func (app *App) Start(ctx context.Context) (err error) { app.mu.RLock() defer app.mu.RUnlock() app.startStat.SpentMsPerComp = make(map[string]int64) @@ -165,7 +167,7 @@ func (app *App) Start() (err error) { for i, s := range app.components { if serviceRun, ok := s.(ComponentRunnable); ok { start := time.Now() - if err = serviceRun.Run(); err != nil { + if err = serviceRun.Run(ctx); err != nil { closeServices(i) return fmt.Errorf("can't run service '%s': %v", serviceRun.Name(), err) } @@ -174,6 +176,21 @@ func (app *App) Start() (err error) { app.startStat.SpentMsPerComp[s.Name()] = spent } } + + stat := app.startStat + if stat.SpentMsTotal > 300 { + log.Errorf("AccountCreate app start takes %dms: %v", stat.SpentMsTotal, stat.SpentMsPerComp) + } + + var request string + if v, ok := ctx.Value(metrics.CtxKeyRequest).(string); ok { + request = v + } + + metrics.SharedClient.RecordEvent(metrics.AppStart{ + Request: request, + TotalMs: stat.SpentMsTotal, + PerCompMs: stat.SpentMsPerComp}) log.Debugf("All components started") return } diff --git a/app/app_test.go b/app/app_test.go index 3e62f7c8e..79a3c2ffc 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -1,6 +1,7 @@ package app import ( + "context" "fmt" "sync/atomic" "testing" @@ -48,7 +49,7 @@ func TestAppStart(t *testing.T) { for _, s := range services { app.Register(s) } - assert.Nil(t, app.Start()) + assert.Nil(t, app.Start(context.Background())) assert.Nil(t, app.Close()) var actual []testIds @@ -78,7 +79,7 @@ func TestAppStart(t *testing.T) { app.Register(s) } - err := app.Start() + err := app.Start(context.Background()) assert.NotNil(t, err) assert.Contains(t, err.Error(), expectedErr.Error()) @@ -144,7 +145,7 @@ type testRunnable struct { testComponent } -func (t *testRunnable) Run() error { +func (t *testRunnable) Run(context.Context) error { t.ids.runId = t.seq.New() return t.err } diff --git a/change/builder.go b/change/builder.go index 89e542b90..8fdb3bbbe 100644 --- a/change/builder.go +++ b/change/builder.go @@ -28,21 +28,21 @@ const ( virtualChangeBaseSeparator = "+" ) -func BuildTreeBefore(s core.SmartBlock, beforeLogId string, includeBeforeId bool) (t *Tree, err error) { +func BuildTreeBefore(ctx context.Context, s core.SmartBlock, beforeLogId string, includeBeforeId bool) (t *Tree, err error) { sb := &stateBuilder{beforeId: beforeLogId, includeBeforeId: includeBeforeId} - err = sb.Build(s) + err = sb.Build(ctx, s) return sb.tree, err } -func BuildTree(s core.SmartBlock) (t *Tree, logHeads map[string]*Change, err error) { +func BuildTree(ctx context.Context, s core.SmartBlock) (t *Tree, logHeads map[string]*Change, err error) { sb := new(stateBuilder) - err = sb.Build(s) + err = sb.Build(ctx, s) return sb.tree, sb.logHeads, err } -func BuildMetaTree(s core.SmartBlock) (t *Tree, logHeads map[string]*Change, err error) { +func BuildMetaTree(ctx context.Context, s core.SmartBlock) (t *Tree, logHeads map[string]*Change, err error) { sb := &stateBuilder{onlyMeta: true} - err = sb.Build(s) + err = sb.Build(ctx, s) return sb.tree, sb.logHeads, err } @@ -60,15 +60,15 @@ type stateBuilder struct { duplicateEvents int } -func (sb *stateBuilder) Build(s core.SmartBlock) (err error) { +func (sb *stateBuilder) Build(ctx context.Context, s core.SmartBlock) (err error) { sb.smartblockId = s.ID() st := time.Now() sb.smartblock = s - logs, err := sb.getLogs() + logs, err := sb.getLogs(ctx) if err != nil { return err } - heads, err := sb.getActualHeads(logs) + heads, err := sb.getActualHeads(ctx, logs) if err != nil { return fmt.Errorf("getActualHeads error: %v", err) } @@ -77,11 +77,11 @@ func (sb *stateBuilder) Build(s core.SmartBlock) (err error) { sb.duplicateEvents = 0 } - breakpoint, err := sb.findBreakpoint(heads) + breakpoint, err := sb.findBreakpoint(ctx, heads) if err != nil { return fmt.Errorf("findBreakpoint error: %v", err) } - if err = sb.buildTree(heads, breakpoint); err != nil { + if err = sb.buildTree(ctx, heads, breakpoint); err != nil { return fmt.Errorf("buildTree error: %v", err) } log.Infof("tree build: len: %d; scanned: %d; dur: %v (lib %v)", sb.tree.Len(), len(sb.cache), time.Since(st), sb.qt) @@ -89,10 +89,10 @@ func (sb *stateBuilder) Build(s core.SmartBlock) (err error) { return } -func (sb *stateBuilder) getLogs() (logs []core.SmartblockLog, err error) { +func (sb *stateBuilder) getLogs(ctx context.Context) (logs []core.SmartblockLog, err error) { sb.cache = make(map[string]*Change) if sb.beforeId != "" { - before, e := sb.loadChange(sb.beforeId) + before, e := sb.loadChange(ctx, sb.beforeId) if e != nil { return nil, e } @@ -120,7 +120,7 @@ func (sb *stateBuilder) getLogs() (logs []core.SmartblockLog, err error) { if len(l.Head) == 0 { continue } - if ch, err := sb.loadChange(l.Head); err != nil { + if ch, err := sb.loadChange(ctx, l.Head); err != nil { log.Errorf("loading head %s of the log %s failed: %v", l.Head, l.ID, err) } else { sb.logHeads[l.ID] = ch @@ -130,8 +130,8 @@ func (sb *stateBuilder) getLogs() (logs []core.SmartblockLog, err error) { return nonEmptyLogs, nil } -func (sb *stateBuilder) buildTree(heads []string, breakpoint string) (err error) { - ch, err := sb.loadChange(breakpoint) +func (sb *stateBuilder) buildTree(ctx context.Context, heads []string, breakpoint string) (err error) { + ch, err := sb.loadChange(ctx, breakpoint) if err != nil { return } @@ -144,7 +144,7 @@ func (sb *stateBuilder) buildTree(heads []string, breakpoint string) (err error) var changes = make([]*Change, 0, len(heads)*2) var uniqMap = map[string]struct{}{breakpoint: {}} for _, id := range heads { - changes, err = sb.loadChangesFor(id, uniqMap, changes) + changes, err = sb.loadChangesFor(ctx, id, uniqMap, changes) if err != nil { return } @@ -166,16 +166,16 @@ func (sb *stateBuilder) buildTree(heads []string, breakpoint string) (err error) return } -func (sb *stateBuilder) loadChangesFor(id string, uniqMap map[string]struct{}, buf []*Change) ([]*Change, error) { +func (sb *stateBuilder) loadChangesFor(ctx context.Context, id string, uniqMap map[string]struct{}, buf []*Change) ([]*Change, error) { if _, exists := uniqMap[id]; exists { return buf, nil } - ch, err := sb.loadChange(id) + ch, err := sb.loadChange(ctx, id) if err != nil { return nil, err } for _, prev := range ch.GetPreviousIds() { - if buf, err = sb.loadChangesFor(prev, uniqMap, buf); err != nil { + if buf, err = sb.loadChangesFor(ctx, prev, uniqMap, buf); err != nil { return nil, err } } @@ -183,13 +183,13 @@ func (sb *stateBuilder) loadChangesFor(id string, uniqMap map[string]struct{}, b return append(buf, ch), nil } -func (sb *stateBuilder) findBreakpoint(heads []string) (breakpoint string, err error) { +func (sb *stateBuilder) findBreakpoint(ctx context.Context, heads []string) (breakpoint string, err error) { var ( ch *Change snapshotIds []string ) for _, head := range heads { - if ch, err = sb.loadChange(head); err != nil { + if ch, err = sb.loadChange(ctx, head); err != nil { return } shId := ch.GetLastSnapshotId() @@ -197,10 +197,10 @@ func (sb *stateBuilder) findBreakpoint(heads []string) (breakpoint string, err e snapshotIds = append(snapshotIds, shId) } } - return sb.findCommonSnapshot(snapshotIds) + return sb.findCommonSnapshot(ctx, snapshotIds) } -func (sb *stateBuilder) findCommonSnapshot(snapshotIds []string) (snapshotId string, err error) { +func (sb *stateBuilder) findCommonSnapshot(ctx context.Context, snapshotIds []string) (snapshotId string, err error) { // sb.smartblock can be nil in this func if len(snapshotIds) == 1 { return snapshotIds[0], nil @@ -212,14 +212,14 @@ func (sb *stateBuilder) findCommonSnapshot(snapshotIds []string) (snapshotId str if s1 == s2 { return s1, nil } - ch1, err := sb.loadChange(s1) + ch1, err := sb.loadChange(ctx, s1) if err != nil { return "", err } if ch1.LastSnapshotId == s2 { return s2, nil } - ch2, err := sb.loadChange(s2) + ch2, err := sb.loadChange(ctx, s2) if err != nil { return "", err } @@ -237,7 +237,7 @@ func (sb *stateBuilder) findCommonSnapshot(snapshotIds []string) (snapshotId str for { lid1 := t1[len(t1)-1] if lid1 != "" { - l1, e := sb.loadChange(lid1) + l1, e := sb.loadChange(ctx, lid1) if e != nil { return "", e } @@ -250,7 +250,7 @@ func (sb *stateBuilder) findCommonSnapshot(snapshotIds []string) (snapshotId str } lid2 := t2[len(t2)-1] if lid2 != "" { - l2, e := sb.loadChange(t2[len(t2)-1]) + l2, e := sb.loadChange(ctx, t2[len(t2)-1]) if e != nil { return "", e } @@ -332,7 +332,7 @@ func (sb *stateBuilder) findCommonSnapshot(snapshotIds []string) (snapshotId str return snapshotIds[0], nil } -func (sb *stateBuilder) getActualHeads(logs []core.SmartblockLog) (heads []string, err error) { +func (sb *stateBuilder) getActualHeads(ctx context.Context, logs []core.SmartblockLog) (heads []string, err error) { sort.Slice(logs, func(i, j int) bool { return logs[i].ID < logs[j].ID }) @@ -342,7 +342,7 @@ func (sb *stateBuilder) getActualHeads(logs []core.SmartblockLog) (heads []strin if slice.FindPos(knownHeads, l.Head) != -1 { // do not scan known heads continue } - sh, err := sb.getNearSnapshot(l.Head) + sh, err := sb.getNearSnapshot(ctx, l.Head) if err != nil { log.Warnf("can't get near snapshot: %v; ignore", err) continue @@ -367,15 +367,15 @@ func (sb *stateBuilder) getActualHeads(logs []core.SmartblockLog) (heads []strin return } -func (sb *stateBuilder) getNearSnapshot(id string) (sh *Change, err error) { - ch, err := sb.loadChange(id) +func (sb *stateBuilder) getNearSnapshot(ctx context.Context, id string) (sh *Change, err error) { + ch, err := sb.loadChange(ctx, id) if err != nil { return } if ch.Snapshot != nil { return ch, nil } - sch, err := sb.loadChange(ch.LastSnapshotId) + sch, err := sb.loadChange(ctx, ch.LastSnapshotId) if err != nil { return } @@ -389,7 +389,7 @@ func (sb *stateBuilder) makeVirtualSnapshotId(s1, s2 string) string { return virtualChangeBasePrefix + base64.RawStdEncoding.EncodeToString([]byte(s1+virtualChangeBaseSeparator+s2)) } -func (sb *stateBuilder) makeChangeFromVirtualId(id string) (*Change, error) { +func (sb *stateBuilder) makeChangeFromVirtualId(ctx context.Context, id string) (*Change, error) { dataB, err := base64.RawStdEncoding.DecodeString(id[len(virtualChangeBasePrefix):]) if err != nil { return nil, fmt.Errorf("invalid virtual id format: %s", err.Error()) @@ -400,11 +400,11 @@ func (sb *stateBuilder) makeChangeFromVirtualId(id string) (*Change, error) { return nil, fmt.Errorf("invalid virtual id format: %v", id) } - ch1, err := sb.loadChange(ids[0]) + ch1, err := sb.loadChange(context.Background(), ids[0]) if err != nil { return nil, err } - ch2, err := sb.loadChange(ids[1]) + ch2, err := sb.loadChange(ctx, ids[1]) if err != nil { return nil, err } @@ -418,12 +418,12 @@ func (sb *stateBuilder) makeChangeFromVirtualId(id string) (*Change, error) { } -func (sb *stateBuilder) loadChange(id string) (ch *Change, err error) { +func (sb *stateBuilder) loadChange(ctx context.Context, id string) (ch *Change, err error) { if ch, ok := sb.cache[id]; ok { return ch, nil } if strings.HasPrefix(id, virtualChangeBasePrefix) { - ch, err = sb.makeChangeFromVirtualId(id) + ch, err = sb.makeChangeFromVirtualId(ctx, id) if err != nil { return nil, err } @@ -434,8 +434,7 @@ func (sb *stateBuilder) loadChange(id string) (ch *Change, err error) { return nil, fmt.Errorf("no smarblock in builder") } st := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() + sr, err := sb.smartblock.GetRecord(ctx, id) s := time.Since(st) if err != nil { diff --git a/change/builder_test.go b/change/builder_test.go index bb11bba9d..fc0cfbbf6 100644 --- a/change/builder_test.go +++ b/change/builder_test.go @@ -1,6 +1,7 @@ package change import ( + "context" "encoding/base64" "encoding/json" "testing" @@ -47,7 +48,7 @@ var ( func TestStateBuilder_Build(t *testing.T) { t.Run("empty", func(t *testing.T) { - _, _, err := BuildTree(NewTestSmartBlock()) + _, _, err := BuildTree(context.Background(), NewTestSmartBlock()) assert.Equal(t, ErrEmpty, err) }) t.Run("linear - one snapshot", func(t *testing.T) { @@ -57,7 +58,7 @@ func TestStateBuilder_Build(t *testing.T) { newSnapshot("s0", "", nil), ) b := new(stateBuilder) - err := b.Build(sb) + err := b.Build(context.Background(), sb) require.NoError(t, err) require.NotNil(t, b.tree) assert.Equal(t, "s0", b.tree.RootId()) @@ -72,7 +73,7 @@ func TestStateBuilder_Build(t *testing.T) { newChange("c0", "s0", "s0"), ) b := new(stateBuilder) - err := b.Build(sb) + err := b.Build(context.Background(), sb) require.NoError(t, err) require.NotNil(t, b.tree) assert.Equal(t, "s0", b.tree.RootId()) @@ -92,7 +93,7 @@ func TestStateBuilder_Build(t *testing.T) { newChange("c2", "s0", "c1"), ) b := new(stateBuilder) - err := b.Build(sb) + err := b.Build(context.Background(), sb) require.NoError(t, err) require.NotNil(t, b.tree) assert.Equal(t, "s0", b.tree.RootId()) @@ -114,7 +115,7 @@ func TestStateBuilder_Build(t *testing.T) { newChange("c3", "s1", "s1"), ) b := new(stateBuilder) - err := b.Build(sb) + err := b.Build(context.Background(), sb) require.NoError(t, err) require.NotNil(t, b.tree) assert.Equal(t, "s1", b.tree.RootId()) @@ -143,7 +144,7 @@ func TestStateBuilder_Build(t *testing.T) { newChange("c3.3", "s1.1", "s1.1"), ) b := new(stateBuilder) - err := b.Build(sb) + err := b.Build(context.Background(), sb) require.NoError(t, err) require.NotNil(t, b.tree) assert.Equal(t, "s0", b.tree.RootId()) @@ -177,7 +178,7 @@ func TestStateBuilder_Build(t *testing.T) { newChange("c4", "s2", "s2"), ) b := new(stateBuilder) - err := b.Build(sb) + err := b.Build(context.Background(), sb) require.NoError(t, err) require.NotNil(t, b.tree) assert.Equal(t, "s2", b.tree.RootId()) @@ -201,7 +202,7 @@ func TestStateBuilder_Build(t *testing.T) { }, } b := new(stateBuilder) - err := b.Build(sb) + err := b.Build(context.Background(), sb) require.NoError(t, err) require.NotNil(t, b.tree) assert.Equal(t, "s0", b.tree.RootId()) @@ -213,12 +214,12 @@ func TestStateBuilder_Build(t *testing.T) { func TestStateBuilder_findCommonSnapshot(t *testing.T) { t.Run("error for empty", func(t *testing.T) { b := new(stateBuilder) - _, err := b.findCommonSnapshot(nil) + _, err := b.findCommonSnapshot(context.Background(), nil) require.Error(t, err) }) t.Run("one snapshot", func(t *testing.T) { b := new(stateBuilder) - id, err := b.findCommonSnapshot([]string{"one"}) + id, err := b.findCommonSnapshot(context.Background(), []string{"one"}) require.NoError(t, err) assert.Equal(t, "one", id) }) @@ -253,7 +254,7 @@ func TestStateBuilder_findCommonSnapshot(t *testing.T) { newSnapshot("s1.5", "s2.4", nil, "s2.4"), ) b := new(stateBuilder) - err := b.Build(sb) + err := b.Build(context.Background(), sb) require.NoError(t, err) assert.Equal(t, "s0", b.tree.RootId()) }) @@ -268,7 +269,7 @@ func TestStateBuilder_findCommonSnapshot(t *testing.T) { newSnapshot("s1.1", "", nil), ) b := new(stateBuilder) - err := b.Build(sb) + err := b.Build(context.Background(), sb) require.NoError(t, err) id := "_virtual:" + base64.RawStdEncoding.EncodeToString([]byte("s0.1+s1.1")) assert.Equal(t, id, b.tree.RootId()) @@ -289,7 +290,7 @@ func TestStateBuilder_findCommonSnapshot(t *testing.T) { newSnapshot("s2.1", "", nil), ) b := new(stateBuilder) - err := b.Build(sb) + err := b.Build(context.Background(), sb) require.NoError(t, err) id2 := "_virtual:" + base64.RawStdEncoding.EncodeToString([]byte("s1.1+s2.1")) id := "_virtual:" + base64.RawStdEncoding.EncodeToString([]byte(id2+"+s0.1")) @@ -312,7 +313,7 @@ func TestBuildDetailsTree(t *testing.T) { newDetailsChange("c5", "s0", "c4", "c4", false), newDetailsChange("c6", "s0", "c5", "c4", false), ) - tr, _, err := BuildMetaTree(sb) + tr, _, err := BuildMetaTree(context.Background(), sb) require.NoError(t, err) assert.Equal(t, 3, tr.Len()) assert.Equal(t, "s0->c2->c4-|", tr.String()) @@ -328,12 +329,12 @@ func TestBuildTreeBefore(t *testing.T) { newSnapshot("s1", "s0", nil, "c0"), newChange("c1", "s1", "s1"), ) - tr, err := BuildTreeBefore(sb, "c1", true) + tr, err := BuildTreeBefore(context.Background(), sb, "c1", true) require.NoError(t, err) require.NotNil(t, tr) assert.Equal(t, "s1", tr.RootId()) assert.Equal(t, 2, tr.Len()) - tr, err = BuildTreeBefore(sb, "c0", true) + tr, err = BuildTreeBefore(context.Background(), sb, "c0", true) require.NoError(t, err) require.NotNil(t, tr) assert.Equal(t, "s0", tr.RootId()) diff --git a/change/change_test.go b/change/change_test.go index b5c18c1ee..e24c8b214 100644 --- a/change/change_test.go +++ b/change/change_test.go @@ -2,6 +2,7 @@ package change import ( "bytes" + "context" "encoding/gob" "fmt" "io/ioutil" @@ -69,7 +70,7 @@ func Test_Issue605Tree(t *testing.T) { Head: "bafyreidatuo2ooxzyao56ic2fm5ybyidheywbt4hgdubr3ow3lyvw7t37e", }) - tree, _, e := BuildTree(sb) + tree, _, e := BuildTree(context.Background(), sb) require.NoError(t, e) var cnt int tree.Iterate(tree.RootId(), func(c *Change) (isContinue bool) { @@ -114,7 +115,7 @@ func Test_Home_ecz5pu(t *testing.T) { Head: "bafyreifmdv6gsspodvsm7wf6orrsi5ibznib7guooqravwvtajttpp7mka", }) - tree, _, e := BuildTree(sb) + tree, _, e := BuildTree(context.Background(), sb) require.NoError(t, e) var cnt int tree.Iterate(tree.RootId(), func(c *Change) (isContinue bool) { diff --git a/change/graphviz_nix.go b/change/graphviz_nix.go index 392a208e1..df1b0d16a 100644 --- a/change/graphviz_nix.go +++ b/change/graphviz_nix.go @@ -9,6 +9,7 @@ package change import ( "bytes" + "context" "fmt" "github.com/anytypeio/go-anytype-middleware/pkg/lib/core" "github.com/anytypeio/go-anytype-middleware/pkg/lib/logging" @@ -154,7 +155,7 @@ func (t *Tree) Graphviz() (data string, err error) { // This will create SVG image of the SmartBlock (i.e a DAG) func CreateSvg(block core.SmartBlock, svgFilename string) (err error) { - t, _, err := BuildTree(block) + t, _, err := BuildTree(context.TODO(), block) if err != nil { logger.Fatal("build tree error:", err) return err diff --git a/change/state_test.go b/change/state_test.go index a82d77990..fe94c2de6 100644 --- a/change/state_test.go +++ b/change/state_test.go @@ -2,6 +2,7 @@ package change import ( "bytes" + "context" "encoding/gob" "io/ioutil" "testing" @@ -33,7 +34,7 @@ func BenchmarkOpenDoc(b *testing.B) { }) st := time.Now() - tree, _, e := BuildTree(sb) + tree, _, e := BuildTree(context.Background(), sb) require.NoError(b, e) b.Log("build tree:", time.Since(st)) b.Log(tree.Len()) @@ -48,7 +49,7 @@ func BenchmarkOpenDoc(b *testing.B) { b.Run("build tree", func(b *testing.B) { for i := 0; i < b.N; i++ { - _, _, err := BuildTree(sb) + _, _, err := BuildTree(context.Background(), sb) require.NoError(b, err) } }) diff --git a/change/tree.go b/change/tree.go index 0b8c28d96..1dbcb7641 100644 --- a/change/tree.go +++ b/change/tree.go @@ -2,6 +2,7 @@ package change import ( "bytes" + "context" "crypto/md5" "fmt" "sort" @@ -279,7 +280,7 @@ func (t *Tree) Get(id string) *Change { return t.attached[id] } -func (t *Tree) LastSnapshotId() string { +func (t *Tree) LastSnapshotId(ctx context.Context) string { var sIds []string for _, hid := range t.headIds { hd := t.attached[hid] @@ -299,7 +300,7 @@ func (t *Tree) LastSnapshotId() string { b := &stateBuilder{ cache: t.attached, } - sId, err := b.findCommonSnapshot(sIds) + sId, err := b.findCommonSnapshot(ctx, sIds) if err != nil { log.Errorf("can't find common snapshot: %v", err) } diff --git a/change/tree_test.go b/change/tree_test.go index 99ff494e9..f625847b9 100644 --- a/change/tree_test.go +++ b/change/tree_test.go @@ -2,6 +2,7 @@ package change import ( "bytes" + "context" "encoding/gob" "fmt" "io/ioutil" @@ -262,7 +263,7 @@ func BenchmarkTree_Iterate(b *testing.B) { Head: "bafyreifmdv6gsspodvsm7wf6orrsi5ibznib7guooqravwvtajttpp7mka", }) - tree, _, e := BuildTree(sb) + tree, _, e := BuildTree(context.Background(), sb) require.NoError(b, e) b.ReportAllocs() b.ResetTimer() @@ -282,13 +283,13 @@ func TestTree_LastSnapshotId(t *testing.T) { newDetailsChange("one", "root", "root", "root", true), newDetailsChange("two", "root", "one", "one", false), )) - assert.Equal(t, "root", tr.LastSnapshotId()) + assert.Equal(t, "root", tr.LastSnapshotId(context.Background())) assert.Equal(t, Append, tr.Add(newSnapshot("three", "root", nil, "two"))) - assert.Equal(t, "three", tr.LastSnapshotId()) + assert.Equal(t, "three", tr.LastSnapshotId(context.Background())) }) t.Run("empty", func(t *testing.T) { tr := new(Tree) - assert.Equal(t, "", tr.LastSnapshotId()) + assert.Equal(t, "", tr.LastSnapshotId(context.Background())) }) t.Run("builder", func(t *testing.T) { tr := new(Tree) @@ -299,7 +300,7 @@ func TestTree_LastSnapshotId(t *testing.T) { newSnapshot("newSh", "root", nil, "one"), ) assert.Equal(t, []string{"newSh", "two"}, tr.Heads()) - assert.Equal(t, "root", tr.LastSnapshotId()) + assert.Equal(t, "root", tr.LastSnapshotId(context.Background())) }) t.Run("builder with split", func(t *testing.T) { tr := new(Tree) @@ -311,6 +312,6 @@ func TestTree_LastSnapshotId(t *testing.T) { newDetailsChange("two", "root2", "root2", "root2", false), newSnapshot("newSh", "root", nil, "one"), ) - assert.Equal(t, "b", tr.LastSnapshotId()) + assert.Equal(t, "b", tr.LastSnapshotId(context.Background())) }) } diff --git a/cmd/cli/cafe.go b/cmd/cli/cafe.go index ebb7297d0..bf5df8335 100644 --- a/cmd/cli/cafe.go +++ b/cmd/cli/cafe.go @@ -50,7 +50,7 @@ var findProfiles = &cobra.Command{ // create temp walletUtil in order to do requests to cafe appMnemonic, err = coreService.WalletGenerateMnemonic(12) appAccount, err = coreService.WalletAccountAt(appMnemonic, 0, "") - app, err := anytype.StartAccountRecoverApp(nil, appAccount) + app, err := anytype.StartAccountRecoverApp(context.Background(), nil, appAccount) if err != nil { console.Fatal("failed to start anytype: %s", err.Error()) return diff --git a/cmd/cli/debug.go b/cmd/cli/debug.go index 70e46a838..7961baa00 100644 --- a/cmd/cli/debug.go +++ b/cmd/cli/debug.go @@ -1,6 +1,7 @@ package main import ( + "context" "github.com/anytypeio/go-anytype-middleware/app" "github.com/anytypeio/go-anytype-middleware/core/anytype" "github.com/anytypeio/go-anytype-middleware/core/debug" @@ -40,7 +41,7 @@ var dumpTree = &cobra.Command{ event.NewCallbackSender(func(event *pb.Event) {}), } - app, err := anytype.StartNewApp(comps...) + app, err := anytype.StartNewApp(context.Background(), comps...) if err != nil { console.Fatal("failed to start anytype: %s", err.Error()) } @@ -70,7 +71,7 @@ var dumpLocalstore = &cobra.Command{ event.NewCallbackSender(func(event *pb.Event) {}), } - app, err := anytype.StartNewApp(comps...) + app, err := anytype.StartNewApp(context.Background(), comps...) if err != nil { console.Fatal("failed to start anytype: %s", err.Error()) } diff --git a/cmd/debugtree/debugtree.go b/cmd/debugtree/debugtree.go index 8fa696eba..f4fce791f 100644 --- a/cmd/debugtree/debugtree.go +++ b/cmd/debugtree/debugtree.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "io/ioutil" @@ -41,7 +42,7 @@ func main() { fmt.Println("build tree...") st := time.Now() - t, _, err := change.BuildTree(dt) + t, _, err := change.BuildTree(context.TODO(), dt) if err != nil { log.Fatal("build tree error:", err) } @@ -62,7 +63,7 @@ func main() { return true }) if id != "" { - if t, err = change.BuildTreeBefore(dt, id, true); err != nil { + if t, err = change.BuildTreeBefore(context.TODO(), dt, id, true); err != nil { log.Fatal("build tree before error:", err) } } diff --git a/core/account.go b/core/account.go index 2ecd09bd5..33a795e31 100644 --- a/core/account.go +++ b/core/account.go @@ -187,12 +187,11 @@ func (mw *Middleware) getInfo() *model.AccountInfo { } cfg := config.ConfigRequired{} - files.GetFileConfig(filepath.Join(wallet.RepoPath(), config.ConfigFileName), &cfg); + files.GetFileConfig(filepath.Join(wallet.RepoPath(), config.ConfigFileName), &cfg) if cfg.IPFSStorageAddr == "" { cfg.IPFSStorageAddr = wallet.RepoPath() } - pBlocks := at.PredefinedBlocks() return &model.AccountInfo{ HomeObjectId: pBlocks.Home, @@ -203,8 +202,8 @@ func (mw *Middleware) getInfo() *model.AccountInfo { MarketplaceTemplateObjectId: pBlocks.MarketplaceTemplate, GatewayUrl: gwAddr, DeviceId: deviceId, - LocalStoragePath: cfg.IPFSStorageAddr, - TimeZone: cfg.TimeZone, + LocalStoragePath: cfg.IPFSStorageAddr, + TimeZone: cfg.TimeZone, } } @@ -289,22 +288,11 @@ func (mw *Middleware) AccountCreate(req *pb.RpcAccountCreateRequest) *pb.RpcAcco mw.EventSender, } - if mw.app, err = anytype.StartNewApp(comps...); err != nil { + if mw.app, err = anytype.StartNewApp(context.WithValue(context.Background(), metrics.CtxKeyRequest, "account_create"), comps...); err != nil { return response(newAcc, pb.RpcAccountCreateResponseError_ACCOUNT_CREATED_BUT_FAILED_TO_START_NODE, err) } - stat := mw.app.StartStat() - if stat.SpentMsTotal > 300 { - log.Errorf("AccountCreate app start takes %dms: %v", stat.SpentMsTotal, stat.SpentMsPerComp) - } - - metrics.SharedClient.RecordEvent(metrics.AppStart{ - Type: "create", - TotalMs: stat.SpentMsTotal, - PerCompMs: stat.SpentMsPerComp}) - coreService := mw.app.MustComponent(core.CName).(core.Service) - newAcc.Name = req.Name bs := mw.app.MustComponent(block.CName).(block.Service) details := []*pb.RpcObjectSetDetailsDetail{{Key: "name", Value: pbtypes.String(req.Name)}} @@ -396,7 +384,7 @@ func (mw *Middleware) AccountRecover(_ *pb.RpcAccountRecoverRequest) *pb.RpcAcco return response(pb.RpcAccountRecoverResponseError_FAILED_TO_STOP_RUNNING_NODE, err) } - if mw.app, err = anytype.StartAccountRecoverApp(mw.EventSender, zeroAccount); err != nil { + if mw.app, err = anytype.StartAccountRecoverApp(context.WithValue(context.Background(), metrics.CtxKeyRequest, "account_recover"), mw.EventSender, zeroAccount); err != nil { return response(pb.RpcAccountRecoverResponseError_FAILED_TO_RUN_NODE, err) } @@ -550,10 +538,12 @@ func (mw *Middleware) AccountSelect(req *pb.RpcAccountSelectRequest) *pb.RpcAcco mw.rootPath = req.RootPath } + var repoWasMissing bool if _, err := os.Stat(filepath.Join(mw.rootPath, req.Id)); os.IsNotExist(err) { if mw.mnemonic == "" { return response(nil, pb.RpcAccountSelectResponseError_LOCAL_REPO_NOT_EXISTS_AND_MNEMONIC_NOT_SET, err) } + repoWasMissing = true var account wallet.Keypair for i := 0; i < 100; i++ { @@ -595,7 +585,12 @@ func (mw *Middleware) AccountSelect(req *pb.RpcAccountSelectRequest) *pb.RpcAcco } var err error - if mw.app, err = anytype.StartNewApp(comps...); err != nil { + request := "account_select" + if repoWasMissing { + // if we have created the repo, we need to highlight that we are recovering the account + request = request + "_recover" + } + if mw.app, err = anytype.StartNewApp(context.WithValue(context.Background(), metrics.CtxKeyRequest, request), comps...); err != nil { if err == core.ErrRepoCorrupted { return response(nil, pb.RpcAccountSelectResponseError_LOCAL_REPO_EXISTS_BUT_CORRUPTED, err) } @@ -607,19 +602,8 @@ func (mw *Middleware) AccountSelect(req *pb.RpcAccountSelectRequest) *pb.RpcAcco return response(nil, pb.RpcAccountSelectResponseError_FAILED_TO_RUN_NODE, err) } - stat := mw.app.StartStat() - if stat.SpentMsTotal > 300 { - log.Errorf("AccountSelect app start takes %dms: %v", stat.SpentMsTotal, stat.SpentMsPerComp) - } - acc := &model.Account{Id: req.Id} acc.Info = mw.getInfo() - - metrics.SharedClient.RecordEvent(metrics.AppStart{ - Type: "select", - TotalMs: stat.SpentMsTotal, - PerCompMs: stat.SpentMsPerComp}) - return response(acc, pb.RpcAccountSelectResponseError_NULL, nil) } diff --git a/core/anytype/bootstrap.go b/core/anytype/bootstrap.go index 215a92a37..75f55a5f8 100644 --- a/core/anytype/bootstrap.go +++ b/core/anytype/bootstrap.go @@ -1,6 +1,7 @@ package anytype import ( + "context" "github.com/anytypeio/go-anytype-middleware/core/account" "github.com/anytypeio/go-anytype-middleware/core/block/bookmark" "os" @@ -42,7 +43,7 @@ import ( "github.com/anytypeio/go-anytype-middleware/util/unsplash" ) -func StartAccountRecoverApp(eventSender event.Sender, accountPrivKey walletUtil.Keypair) (a *app.App, err error) { +func StartAccountRecoverApp(ctx context.Context, eventSender event.Sender, accountPrivKey walletUtil.Keypair) (a *app.App, err error) { a = new(app.App) device, err := walletUtil.NewRandomKeypair(walletUtil.KeypairTypeDevice) if err != nil { @@ -59,7 +60,7 @@ func StartAccountRecoverApp(eventSender event.Sender, accountPrivKey walletUtil. Register(profilefinder.New()). Register(eventSender) - if err = a.Start(); err != nil { + if err = a.Start(ctx); err != nil { return } @@ -77,12 +78,12 @@ func BootstrapWallet(rootPath, accountId string) wallet.Wallet { return wallet.NewWithAccountRepo(rootPath, accountId) } -func StartNewApp(components ...app.Component) (a *app.App, err error) { +func StartNewApp(ctx context.Context, components ...app.Component) (a *app.App, err error) { a = new(app.App) Bootstrap(a, components...) metrics.SharedClient.SetAppVersion(a.Version()) metrics.SharedClient.Run() - if err = a.Start(); err != nil { + if err = a.Start(ctx); err != nil { metrics.SharedClient.Close() a = nil return diff --git a/core/block/doc/service.go b/core/block/doc/service.go index f72a25e6e..0a0bb993c 100644 --- a/core/block/doc/service.go +++ b/core/block/doc/service.go @@ -62,7 +62,7 @@ func (l *listener) Init(a *app.App) (err error) { return } -func (l *listener) Run() (err error) { +func (l *listener) Run(context.Context) (err error) { go l.wakeupLoop() return } diff --git a/core/block/doc/service_test.go b/core/block/doc/service_test.go index fedc300d6..b62801f91 100644 --- a/core/block/doc/service_test.go +++ b/core/block/doc/service_test.go @@ -47,7 +47,7 @@ func TestService_WakeupLoop(t *testing.T) { rb := recordsbatcher.New() a := new(app.App) a.Register(rb).Register(dh).Register(New()) - require.NoError(t, a.Start()) + require.NoError(t, a.Start(context.Background())) defer a.Close() recId := func(id string) core.ThreadRecordInfo { diff --git a/core/block/editor/files.go b/core/block/editor/files.go index 60347edad..3f04debed 100644 --- a/core/block/editor/files.go +++ b/core/block/editor/files.go @@ -46,7 +46,7 @@ func (p *Files) Init(ctx *smartblock.InitContext) (err error) { if err = p.SmartBlock.Init(ctx); err != nil { return } - doc, err := ctx.Source.ReadDoc(nil, true) + doc, err := ctx.Source.ReadDoc(ctx.Ctx, nil, true) if err != nil { return err } diff --git a/core/block/editor/smartblock/smartblock.go b/core/block/editor/smartblock/smartblock.go index b48e22aae..00f405a51 100644 --- a/core/block/editor/smartblock/smartblock.go +++ b/core/block/editor/smartblock/smartblock.go @@ -135,6 +135,7 @@ type InitContext struct { Restriction restriction.Service Doc doc.Service ObjectStore objectstore.ObjectStore + Ctx context.Context } type linkSource interface { @@ -213,7 +214,11 @@ func (sb *smartBlock) Type() model.SmartBlockType { } func (sb *smartBlock) Init(ctx *InitContext) (err error) { - if sb.Doc, err = ctx.Source.ReadDoc(sb, ctx.State != nil); err != nil { + cctx := ctx.Ctx + if cctx == nil { + cctx = context.Background() + } + if sb.Doc, err = ctx.Source.ReadDoc(cctx, sb, ctx.State != nil); err != nil { return fmt.Errorf("reading document: %w", err) } diff --git a/core/block/editor/smartblock/smartblock_test.go b/core/block/editor/smartblock/smartblock_test.go index 902cb527c..1bc99ae2e 100644 --- a/core/block/editor/smartblock/smartblock_test.go +++ b/core/block/editor/smartblock/smartblock_test.go @@ -1,6 +1,7 @@ package smartblock import ( + "context" "testing" "github.com/anytypeio/go-anytype-middleware/core/block/editor/state" @@ -120,7 +121,7 @@ func (fx *fixture) init(blocks []*model.Block) { bm[b.Id] = simple.New(b) } doc := state.NewDoc(id, bm) - fx.source.EXPECT().ReadDoc(gomock.Any(), false).Return(doc, nil) + fx.source.EXPECT().ReadDoc(context.Background(), gomock.Any(), false).Return(doc, nil) fx.source.EXPECT().Id().Return(id).AnyTimes() fx.store.EXPECT().GetDetails(id).Return(&model.ObjectDetails{ Details: &types.Struct{Fields: map[string]*types.Value{}}, diff --git a/core/block/process/service.go b/core/block/process/service.go index e82266829..ada4e0b5b 100644 --- a/core/block/process/service.go +++ b/core/block/process/service.go @@ -1,6 +1,7 @@ package process import ( + "context" "errors" "fmt" "reflect" @@ -57,7 +58,7 @@ func (s *service) Name() (name string) { return CName } -func (s *service) Run() (err error) { +func (s *service) Run(context.Context) (err error) { return } diff --git a/core/block/process/service_test.go b/core/block/process/service_test.go index cee22f13c..0abedf287 100644 --- a/core/block/process/service_test.go +++ b/core/block/process/service_test.go @@ -1,6 +1,7 @@ package process import ( + "context" "testing" "time" @@ -93,6 +94,6 @@ func NewTest(se func(e *pb.Event)) Service { a.Register(&testapp.EventSender{ F: se, }).Register(s) - a.Start() + a.Start(context.Background()) return s } diff --git a/core/block/service.go b/core/block/service.go index bbf5d7324..50cc79f9e 100644 --- a/core/block/service.go +++ b/core/block/service.go @@ -311,12 +311,12 @@ func (s *service) Init(a *app.App) (err error) { return } -func (s *service) Run() (err error) { - s.initPredefinedBlocks() +func (s *service) Run(ctx context.Context) (err error) { + s.initPredefinedBlocks(ctx) return } -func (s *service) initPredefinedBlocks() { +func (s *service) initPredefinedBlocks(ctx context.Context) { ids := []string{ s.anytype.PredefinedBlocks().Account, s.anytype.PredefinedBlocks().AccountOld, @@ -334,7 +334,7 @@ func (s *service) initPredefinedBlocks() { // skip object that has been already indexed before continue } - ctx := &smartblock.InitContext{State: state.NewDoc(id, nil).(*state.State)} + ctx := &smartblock.InitContext{Ctx: ctx, State: state.NewDoc(id, nil).(*state.State)} // this is needed so that old account will create its state successfully on first launch if id == s.anytype.PredefinedBlocks().AccountOld { ctx = nil @@ -371,9 +371,9 @@ func (s *service) Anytype() core.Service { func (s *service) OpenBlock(ctx *state.Context, id string) (err error) { startTime := time.Now() - ob, err := s.getSmartblock(context.TODO(), id) + ob, err := s.getSmartblock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "object_open"), id) if err != nil { - return + return err } afterSmartBlockTime := time.Now() defer s.cache.Release(id) @@ -425,7 +425,8 @@ func (s *service) OpenBlock(ctx *state.Context, id string) (err error) { } func (s *service) ShowBlock(ctx *state.Context, id string) (err error) { - return s.Do(id, func(b smartblock.SmartBlock) error { + cctx := context.WithValue(context.TODO(), metrics.CtxKeyRequest, "object_show") + return s.DoWithContext(cctx, id, func(b smartblock.SmartBlock) error { return b.Show(ctx) }) } @@ -1169,7 +1170,7 @@ func (s *service) stateFromTemplate(templateId, name string) (st *state.State, e } func (s *service) MigrateMany(objects []threads.ThreadInfo) (migrated int, err error) { - err = s.Do(s.anytype.PredefinedBlocks().Account, func(b smartblock.SmartBlock) error { + err = s.DoWithContext(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "migrate_many"), s.anytype.PredefinedBlocks().Account, func(b smartblock.SmartBlock) error { workspace, ok := b.(*editor.Workspaces) if !ok { return fmt.Errorf("incorrect object with workspace id") @@ -1185,7 +1186,7 @@ func (s *service) MigrateMany(objects []threads.ThreadInfo) (migrated int, err e } func (s *service) DoBasic(id string, apply func(b basic.Basic) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do_basic"), id) if err != nil { return err } @@ -1218,7 +1219,7 @@ func (s *service) DoTable(id string, ctx *state.Context, apply func(st *state.St } func (s *service) DoLinksCollection(id string, apply func(b basic.Basic) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do_links_collection"), id) if err != nil { return err } @@ -1232,7 +1233,7 @@ func (s *service) DoLinksCollection(id string, apply func(b basic.Basic) error) } func (s *service) DoClipboard(id string, apply func(b clipboard.Clipboard) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do_clipboard"), id) if err != nil { return err } @@ -1246,7 +1247,7 @@ func (s *service) DoClipboard(id string, apply func(b clipboard.Clipboard) error } func (s *service) DoText(id string, apply func(b stext.Text) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do_text"), id) if err != nil { return err } @@ -1260,7 +1261,7 @@ func (s *service) DoText(id string, apply func(b stext.Text) error) error { } func (s *service) DoFile(id string, apply func(b file.File) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do_file"), id) if err != nil { return err } @@ -1274,7 +1275,7 @@ func (s *service) DoFile(id string, apply func(b file.File) error) error { } func (s *service) DoBookmark(id string, apply func(b bookmark.Bookmark) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do_bookmark"), id) if err != nil { return err } @@ -1288,7 +1289,7 @@ func (s *service) DoBookmark(id string, apply func(b bookmark.Bookmark) error) e } func (s *service) DoFileNonLock(id string, apply func(b file.File) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do_filenonlock"), id) if err != nil { return err } @@ -1300,7 +1301,7 @@ func (s *service) DoFileNonLock(id string, apply func(b file.File) error) error } func (s *service) DoHistory(id string, apply func(b basic.IHistory) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do_history"), id) if err != nil { return err } @@ -1314,7 +1315,7 @@ func (s *service) DoHistory(id string, apply func(b basic.IHistory) error) error } func (s *service) DoImport(id string, apply func(b _import.Import) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do_import"), id) if err != nil { return err } @@ -1329,7 +1330,7 @@ func (s *service) DoImport(id string, apply func(b _import.Import) error) error } func (s *service) DoDataview(id string, apply func(b dataview.Dataview) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do_dataview"), id) if err != nil { return err } @@ -1343,7 +1344,7 @@ func (s *service) DoDataview(id string, apply func(b dataview.Dataview) error) e } func (s *service) Do(id string, apply func(b smartblock.SmartBlock) error) error { - sb, release, err := s.pickBlock(context.TODO(), id) + sb, release, err := s.pickBlock(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "do"), id) if err != nil { return err } @@ -1561,7 +1562,7 @@ func (s *service) ObjectToBookmark(id string, url string) (objectId string, err } func (s *service) loadSmartblock(ctx context.Context, id string) (value ocache.Object, err error) { - sb, err := s.newSmartBlock(id, nil) + sb, err := s.newSmartBlock(id, &smartblock.InitContext{Ctx: ctx}) if err != nil { return } diff --git a/core/block/source/anytypeprofile.go b/core/block/source/anytypeprofile.go index ac8874265..960f1df30 100644 --- a/core/block/source/anytypeprofile.go +++ b/core/block/source/anytypeprofile.go @@ -64,7 +64,7 @@ func (v *anytypeProfile) getDetails() (p *types.Struct) { }} } -func (v *anytypeProfile) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { +func (v *anytypeProfile) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { s := state.NewDoc(v.id, nil).(*state.State) d := v.getDetails() @@ -74,7 +74,7 @@ func (v *anytypeProfile) ReadDoc(receiver ChangeReceiver, empty bool) (doc state return s, nil } -func (v *anytypeProfile) ReadMeta(_ ChangeReceiver) (doc state.Doc, err error) { +func (v *anytypeProfile) ReadMeta(ctx context.Context, _ ChangeReceiver) (doc state.Doc, err error) { s := &state.State{} d := v.getDetails() diff --git a/core/block/source/bundledobjecttype.go b/core/block/source/bundledobjecttype.go index 86e667104..568f756c0 100644 --- a/core/block/source/bundledobjecttype.go +++ b/core/block/source/bundledobjecttype.go @@ -77,7 +77,7 @@ func getDetailsForBundledObjectType(id string) (extraRels []*model.Relation, p * return extraRels, det, nil } -func (v *bundledObjectType) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { +func (v *bundledObjectType) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { s := state.NewDoc(v.id, nil).(*state.State) rels, d, err := getDetailsForBundledObjectType(v.id) @@ -91,7 +91,7 @@ func (v *bundledObjectType) ReadDoc(receiver ChangeReceiver, empty bool) (doc st return s, nil } -func (v *bundledObjectType) ReadMeta(_ ChangeReceiver) (doc state.Doc, err error) { +func (v *bundledObjectType) ReadMeta(ctx context.Context, _ ChangeReceiver) (doc state.Doc, err error) { s := &state.State{} rels, d, err := getDetailsForBundledObjectType(v.id) diff --git a/core/block/source/bundledrelation.go b/core/block/source/bundledrelation.go index e3a7b3627..afd9eed14 100644 --- a/core/block/source/bundledrelation.go +++ b/core/block/source/bundledrelation.go @@ -62,7 +62,7 @@ func (v *bundledRelation) getDetails(id string) (rels []*model.Relation, p *type return rels, d, nil } -func (v *bundledRelation) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { +func (v *bundledRelation) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { s := state.NewDoc(v.id, nil).(*state.State) rels, d, err := v.getDetails(v.id) @@ -76,7 +76,7 @@ func (v *bundledRelation) ReadDoc(receiver ChangeReceiver, empty bool) (doc stat return s, nil } -func (v *bundledRelation) ReadMeta(_ ChangeReceiver) (doc state.Doc, err error) { +func (v *bundledRelation) ReadMeta(ctx context.Context, _ ChangeReceiver) (doc state.Doc, err error) { s := &state.State{} rels, d, err := v.getDetails(v.id) diff --git a/core/block/source/date.go b/core/block/source/date.go index dd1407ce9..9e047670d 100644 --- a/core/block/source/date.go +++ b/core/block/source/date.go @@ -83,7 +83,7 @@ func (v *date) parseId() error { return nil } -func (v *date) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { +func (v *date) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { if err = v.parseId(); err != nil { return } @@ -95,7 +95,7 @@ func (v *date) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err return s, nil } -func (v *date) ReadMeta(_ ChangeReceiver) (doc state.Doc, err error) { +func (v *date) ReadMeta(ctx context.Context, _ ChangeReceiver) (doc state.Doc, err error) { if err = v.parseId(); err != nil { return } diff --git a/core/block/source/files.go b/core/block/source/files.go index 5b1aa6b03..8f02b0a94 100644 --- a/core/block/source/files.go +++ b/core/block/source/files.go @@ -73,7 +73,7 @@ func getDetailsForFileOrImage(ctx context.Context, a core.Service, id string) (p return d, false, nil } -func (v *files) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { +func (v *files) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { s := state.NewDoc(v.id, nil).(*state.State) ctx, cancel := context.WithTimeout(context.Background(), getFileTimeout) @@ -90,7 +90,7 @@ func (v *files) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err return s, nil } -func (v *files) ReadMeta(_ ChangeReceiver) (doc state.Doc, err error) { +func (v *files) ReadMeta(ctx context.Context, _ ChangeReceiver) (doc state.Doc, err error) { s := &state.State{} ctx, cancel := context.WithTimeout(context.Background(), getFileTimeout) diff --git a/core/block/source/indexedrelation.go b/core/block/source/indexedrelation.go index e020ef8e1..d79138fa3 100644 --- a/core/block/source/indexedrelation.go +++ b/core/block/source/indexedrelation.go @@ -65,7 +65,7 @@ func (v *indexedRelation) getDetails(id string) (rels []*model.Relation, p *type return rels, d, nil } -func (v *indexedRelation) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { +func (v *indexedRelation) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { s := state.NewDoc(v.id, nil).(*state.State) rels, d, err := v.getDetails(v.id) @@ -79,7 +79,7 @@ func (v *indexedRelation) ReadDoc(receiver ChangeReceiver, empty bool) (doc stat return s, nil } -func (v *indexedRelation) ReadMeta(_ ChangeReceiver) (doc state.Doc, err error) { +func (v *indexedRelation) ReadMeta(ctx context.Context, _ ChangeReceiver) (doc state.Doc, err error) { s := &state.State{} rels, d, err := v.getDetails(v.id) diff --git a/core/block/source/source.go b/core/block/source/source.go index b2ba3e0fd..cf086041c 100644 --- a/core/block/source/source.go +++ b/core/block/source/source.go @@ -42,8 +42,8 @@ type Source interface { LogHeads() map[string]string GetFileKeysSnapshot() []*pb.ChangeFileKeys ReadOnly() bool - ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error) - ReadMeta(receiver ChangeReceiver) (doc state.Doc, err error) + ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) + ReadMeta(ctx context.Context, receiver ChangeReceiver) (doc state.Doc, err error) PushChange(params PushChangeParams) (id string, err error) FindFirstChange(ctx context.Context) (c *change.Change, err error) Close() (err error) @@ -161,16 +161,16 @@ func (s *source) Virtual() bool { return false } -func (s *source) ReadMeta(receiver ChangeReceiver) (doc state.Doc, err error) { +func (s *source) ReadMeta(ctx context.Context, receiver ChangeReceiver) (doc state.Doc, err error) { s.metaOnly = true - return s.readDoc(receiver, false) + return s.readDoc(ctx, receiver, false) } -func (s *source) ReadDoc(receiver ChangeReceiver, allowEmpty bool) (doc state.Doc, err error) { - return s.readDoc(receiver, allowEmpty) +func (s *source) ReadDoc(ctx context.Context, receiver ChangeReceiver, allowEmpty bool) (doc state.Doc, err error) { + return s.readDoc(ctx, receiver, allowEmpty) } -func (s *source) readDoc(receiver ChangeReceiver, allowEmpty bool) (doc state.Doc, err error) { +func (s *source) readDoc(ctx context.Context, receiver ChangeReceiver, allowEmpty bool) (doc state.Doc, err error) { var ch chan core.SmartblockRecordEnvelope batch := mb.New(0) if receiver != nil { @@ -196,21 +196,53 @@ func (s *source) readDoc(receiver ChangeReceiver, allowEmpty bool) (doc state.Do startTime := time.Now() log.With("thread", s.id). Debug("start building tree") + loadCh := make(chan struct{}) + + ctxP := core.ThreadLoadProgress{} + ctx = ctxP.DeriveContext(ctx) + + go func() { + forloop: + for { + select { + case <-loadCh: + break forloop + case <-time.After(time.Second * 5): + v := ctxP.Value() + log.With("object_id", s.id).With("records_loaded", v.RecordsLoaded).With("records_missing", v.RecordsMissingLocally).With("spent", time.Since(startTime).Seconds()).Errorf("openBlock in progress") + } + } + }() if s.metaOnly { - s.tree, s.logHeads, err = change.BuildMetaTree(s.sb) + s.tree, s.logHeads, err = change.BuildMetaTree(ctx, s.sb) } else { - s.tree, s.logHeads, err = change.BuildTree(s.sb) + s.tree, s.logHeads, err = change.BuildTree(ctx, s.sb) } + close(loadCh) treeBuildTime := time.Now().Sub(startTime).Milliseconds() log.With("object id", s.id). With("build time ms", treeBuildTime). Debug("stop building tree") + // if the build time is large enough we should record it if treeBuildTime > 100 { - metrics.SharedClient.RecordEvent(metrics.TreeBuild{ + event := metrics.TreeBuild{ TimeMs: treeBuildTime, ObjectId: s.id, - }) + Logs: len(s.logHeads), + } + ctxProgress, _ := ctx.Value(core.ThreadLoadProgressContextKey).(*core.ThreadLoadProgress) + if v := ctx.Value(metrics.CtxKeyRequest); v != nil { + event.Request = v.(string) + } + + if ctxProgress != nil { + event.RecordsLoaded = ctxProgress.RecordsLoaded + event.RecordsMissing = ctxProgress.RecordsMissingLocally + event.RecordsFailed = ctxProgress.RecordsFailedToLoad + } + + metrics.SharedClient.RecordEvent(event) } if allowEmpty && err == change.ErrEmpty { @@ -443,12 +475,12 @@ func (s *source) applyRecords(records []core.SmartblockRecordEnvelope) error { // existing or not complete return nil case change.Append: - s.lastSnapshotId = s.tree.LastSnapshotId() + s.lastSnapshotId = s.tree.LastSnapshotId(context.TODO()) return s.receiver.StateAppend(func(d state.Doc) (*state.State, error) { return change.BuildStateSimpleCRDT(d.(*state.State), s.tree) }) case change.Rebuild: - s.lastSnapshotId = s.tree.LastSnapshotId() + s.lastSnapshotId = s.tree.LastSnapshotId(context.TODO()) doc, err := s.buildState() if err != nil { diff --git a/core/block/source/static.go b/core/block/source/static.go index 7ad783b61..de971bf49 100644 --- a/core/block/source/static.go +++ b/core/block/source/static.go @@ -49,11 +49,11 @@ func (s *static) ReadOnly() bool { return true } -func (s *static) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { +func (s *static) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { return s.doc, nil } -func (s *static) ReadMeta(receiver ChangeReceiver) (doc state.Doc, err error) { +func (s *static) ReadMeta(ctx context.Context, receiver ChangeReceiver) (doc state.Doc, err error) { return s.doc, nil } diff --git a/core/block/source/threaddb.go b/core/block/source/threaddb.go index e4783233a..5bd4f69f5 100644 --- a/core/block/source/threaddb.go +++ b/core/block/source/threaddb.go @@ -70,7 +70,7 @@ func (v *threadDB) Virtual() bool { return true } -func (v *threadDB) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { +func (v *threadDB) ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { threads.WorkspaceLogger. With("workspace id", v.id). Debug("reading document for workspace") @@ -87,7 +87,7 @@ func (v *threadDB) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, return s, nil } -func (v *threadDB) ReadMeta(_ ChangeReceiver) (doc state.Doc, err error) { +func (v *threadDB) ReadMeta(ctx context.Context, _ ChangeReceiver) (doc state.Doc, err error) { return v.createState() } diff --git a/core/block/source/virtual.go b/core/block/source/virtual.go index b07e6fb21..d1296e8ef 100644 --- a/core/block/source/virtual.go +++ b/core/block/source/virtual.go @@ -45,11 +45,11 @@ func (v *virtual) Virtual() bool { return true } -func (v *virtual) ReadDoc(receiver ChangeReceiver, empty bool) (doc state.Doc, err error) { +func (v *virtual) ReadDoc(ctx context.Context, eceiver ChangeReceiver, empty bool) (doc state.Doc, err error) { return state.NewDoc(v.id, nil), nil } -func (v *virtual) ReadMeta(_ ChangeReceiver) (doc state.Doc, err error) { +func (v *virtual) ReadMeta(ctx context.Context, _ ChangeReceiver) (doc state.Doc, err error) { return state.NewDoc(v.id, nil), nil } diff --git a/core/configfetcher/configfetcher.go b/core/configfetcher/configfetcher.go index dd543b1c6..1b1834023 100644 --- a/core/configfetcher/configfetcher.go +++ b/core/configfetcher/configfetcher.go @@ -99,7 +99,7 @@ func New() ConfigFetcher { return &configFetcher{} } -func (c *configFetcher) Run() error { +func (c *configFetcher) Run(context.Context) error { c.ctx, c.cancel = context.WithCancel(context.Background()) go c.run() return nil diff --git a/core/debug/debugtree/debugtree.go b/core/debug/debugtree/debugtree.go index 226b4703e..c0d68f42f 100644 --- a/core/debug/debugtree/debugtree.go +++ b/core/debug/debugtree/debugtree.go @@ -199,7 +199,7 @@ func (r *debugTree) Stats() (s DebugTreeStats) { } func (r *debugTree) BuildState() (*state.State, error) { - t, _, err := change.BuildTree(r) + t, _, err := change.BuildTree(context.TODO(), r) if err != nil { return nil, err } diff --git a/core/history/history.go b/core/history/history.go index 42bc5bc4f..869d2cec6 100644 --- a/core/history/history.go +++ b/core/history/history.go @@ -1,6 +1,7 @@ package history import ( + "context" "fmt" "time" @@ -196,11 +197,11 @@ func (h *history) buildTree(pageId, versionId string, includeLastId bool) (tree return } if versionId != "" { - if tree, err = change.BuildTreeBefore(sb, versionId, includeLastId); err != nil { + if tree, err = change.BuildTreeBefore(context.TODO(), sb, versionId, includeLastId); err != nil { return } } else { - if tree, _, err = change.BuildTree(sb); err != nil { + if tree, _, err = change.BuildTree(context.TODO(), sb); err != nil { return } } diff --git a/core/history/history_test.go b/core/history/history_test.go index 784f9fa5e..fc79b2ae7 100644 --- a/core/history/history_test.go +++ b/core/history/history_test.go @@ -1,6 +1,7 @@ package history import ( + "context" "testing" "github.com/anytypeio/go-anytype-middleware/pkg/lib/threads" @@ -61,7 +62,7 @@ func newFixture(t *testing.T) *fixture { a.EXPECT().ProfileID().AnyTimes() a.EXPECT().LocalProfile().AnyTimes() testMock.RegisterMockObjectStore(ctrl, ta) - require.NoError(t, ta.Start()) + require.NoError(t, ta.Start(context.Background())) return &fixture{ History: h, ta: ta, diff --git a/core/indexer/indexer.go b/core/indexer/indexer.go index 1a30f737a..8285acf37 100644 --- a/core/indexer/indexer.go +++ b/core/indexer/indexer.go @@ -154,7 +154,7 @@ func (i *indexer) saveLatestCounters() error { return i.store.SaveChecksums(&checksums) } -func (i *indexer) Run() (err error) { +func (i *indexer) Run(context.Context) (err error) { if ftErr := i.ftInit(); ftErr != nil { log.Errorf("can't init ft: %v", ftErr) } @@ -244,7 +244,7 @@ func (i *indexer) reindexIfNeeded() error { if checksums.IdxRebuildCounter != ForceIdxRebuildCounter { reindex = math.MaxUint64 } - return i.Reindex(context.TODO(), reindex) + return i.Reindex(context.WithValue(context.TODO(), metrics.CtxKeyRequest, "reindex_forced"), reindex) } func (i *indexer) reindexOutdatedThreads() (toReindex, success int, err error) { @@ -285,19 +285,23 @@ func (i *indexer) reindexOutdatedThreads() (toReindex, success int, err error) { } } + var ctx context.Context if len(idsToReindex) > 0 { for _, id := range idsToReindex { // TODO: we should reindex it I guess at start //if i.anytype.PredefinedBlocks().IsAccount(id) { // continue //} - ctx := context.WithValue(context.Background(), ocache.CacheTimeout, cacheTimeout) + + // we do this instead of context.WithTimeout in order to continue loading in case of timeout in background + ctx = context.WithValue(context.Background(), ocache.CacheTimeout, cacheTimeout) + ctx = context.WithValue(ctx, metrics.CtxKeyRequest, "reindexOutdatedThreads") d, err := i.doc.GetDocInfo(ctx, id) if err != nil { - log.Errorf("reindexDoc failed to open %s: %s", id, err.Error()) continue } - err = i.index(context.TODO(), d) + + err = i.index(ctx, d) if err == nil { success++ } else { @@ -766,11 +770,13 @@ func (i *indexer) ftIndex() { func (i *indexer) ftIndexDoc(id string, _ time.Time) (err error) { st := time.Now() ctx := context.WithValue(context.Background(), ocache.CacheTimeout, cacheTimeout) + ctx = context.WithValue(ctx, metrics.CtxKeyRequest, "index_fulltext") info, err := i.doc.GetDocInfo(ctx, id) if err != nil { return } + sbType, err := smartblock.SmartBlockTypeFromID(info.Id) if err != nil { sbType = smartblock.SmartBlockTypePage diff --git a/core/indexer/indexer_test.go b/core/indexer/indexer_test.go index 3bc56aabf..f30d3a7d5 100644 --- a/core/indexer/indexer_test.go +++ b/core/indexer/indexer_test.go @@ -1,6 +1,7 @@ package indexer_test import ( + "context" "io" "io/ioutil" "os" @@ -52,7 +53,7 @@ func newFixture(t *testing.T) *fixture { fx.docService = mockDoc.NewMockService(fx.ctrl) fx.docService.EXPECT().Name().AnyTimes().Return(doc.CName) fx.docService.EXPECT().Init(gomock.Any()) - fx.docService.EXPECT().Run() + fx.docService.EXPECT().Run(context.Background()) fx.anytype.EXPECT().PredefinedBlocks().Times(2) fx.docService.EXPECT().Close().AnyTimes() fx.objectStore = testMock.RegisterMockObjectStore(fx.ctrl, ta) @@ -106,7 +107,7 @@ func newFixture(t *testing.T) *fixture { With(source.New()) mockStatus.RegisterMockStatus(fx.ctrl, ta) mockBuiltinTemplate.RegisterMockBuiltinTemplate(fx.ctrl, ta).EXPECT().Hash().AnyTimes() - require.NoError(t, ta.Start()) + require.NoError(t, ta.Start(context.Background())) return fx } diff --git a/core/status/service.go b/core/status/service.go index b46ec2a8c..14b2548aa 100644 --- a/core/status/service.go +++ b/core/status/service.go @@ -1,6 +1,7 @@ package status import ( + "context" "fmt" "sort" "sync" @@ -117,7 +118,7 @@ func (s *service) Init(a *app.App) (err error) { return } -func (s *service) Run() error { +func (s *service) Run(context.Context) error { s.mu.Lock() defer func() { s.isRunning = true diff --git a/core/subscription/service.go b/core/subscription/service.go index aa03bdef6..c3e9ee9f0 100644 --- a/core/subscription/service.go +++ b/core/subscription/service.go @@ -1,6 +1,7 @@ package subscription import ( + "context" "fmt" "sync" "time" @@ -80,7 +81,7 @@ func (s *service) Name() (name string) { return CName } -func (s *service) Run() (err error) { +func (s *service) Run(context.Context) (err error) { s.objectStore.SubscribeForAll(func(rec database.Record) { s.recBatch.Add(rec) }) diff --git a/core/subscription/service_test.go b/core/subscription/service_test.go index 5d5126a7f..0647e4382 100644 --- a/core/subscription/service_test.go +++ b/core/subscription/service_test.go @@ -1,6 +1,7 @@ package subscription import ( + "context" "testing" "github.com/anytypeio/go-anytype-middleware/app/testapp" @@ -254,7 +255,7 @@ func newFixture(t *testing.T) *fixture { a.Register(fx.Service) a.Register(fx.sender) fx.store.EXPECT().SubscribeForAll(gomock.Any()) - require.NoError(t, a.Start()) + require.NoError(t, a.Start(context.Background())) return fx } diff --git a/metrics/client.go b/metrics/client.go index 76ef652cb..8f323ba35 100644 --- a/metrics/client.go +++ b/metrics/client.go @@ -2,6 +2,7 @@ package metrics import ( "context" + "fmt" "github.com/cheggaaa/mb" "sync" "time" @@ -263,6 +264,8 @@ func (c *client) RecordEvent(ev EventRepresentable) { AppVersion: c.appVersion, Time: time.Now().Unix() * 1000, } + fmt.Printf("EVENT %s: %+v\n", ampEvent.EventType, ampEvent.EventProperties) + b := c.batcher c.lock.RUnlock() if b == nil { diff --git a/metrics/events.go b/metrics/events.go index be01bc86f..dd0e81577 100644 --- a/metrics/events.go +++ b/metrics/events.go @@ -4,6 +4,8 @@ import ( "fmt" ) +const CtxKeyRequest = "request" + type RecordAcceptEventAggregated struct { IsNAT bool RecordType string @@ -188,16 +190,26 @@ func (c BlockSplit) ToEvent() *Event { } type TreeBuild struct { - TimeMs int64 - ObjectId string + TimeMs int64 + ObjectId string + Logs int + Request string + RecordsLoaded int + RecordsMissing int + RecordsFailed int } func (c TreeBuild) ToEvent() *Event { return &Event{ EventType: "tree_build", EventData: map[string]interface{}{ - "object_id": c.ObjectId, - "time_ms": c.TimeMs, + "object_id": c.ObjectId, + "logs": c.Logs, + "request": c.Request, + "records_loaded": c.RecordsLoaded, + "records_missing": c.RecordsMissing, + "records_failed": c.RecordsFailed, + "time_ms": c.TimeMs, }, } } @@ -233,7 +245,7 @@ func (c StateApply) ToEvent() *Event { } type AppStart struct { - Type string + Request string TotalMs int64 PerCompMs map[string]int64 } @@ -242,7 +254,7 @@ func (c AppStart) ToEvent() *Event { return &Event{ EventType: "app_start", EventData: map[string]interface{}{ - "type": c.Type, + "request": c.Request, "time_ms": c.TotalMs, "per_comp": c.PerCompMs, }, diff --git a/pkg/lib/core/context.go b/pkg/lib/core/context.go new file mode 100644 index 000000000..a64bd618f --- /dev/null +++ b/pkg/lib/core/context.go @@ -0,0 +1,52 @@ +package core + +import ( + "context" + "sync" +) + +type ThreadLoadProgress struct { + RecordsMissingLocally int + RecordsLoaded int + RecordsFailedToLoad int + lk sync.Mutex +} + +type contextKey string + +const ThreadLoadProgressContextKey contextKey = "threadload" + +// DeriveContext returns a new context with value "progress" derived from +// the given one. +func (p *ThreadLoadProgress) DeriveContext(ctx context.Context) context.Context { + return context.WithValue(ctx, ThreadLoadProgressContextKey, p) +} + +func (p *ThreadLoadProgress) IncrementLoadedRecords() { + p.lk.Lock() + defer p.lk.Unlock() + p.RecordsLoaded++ +} + +func (p *ThreadLoadProgress) IncrementFailedRecords() { + p.lk.Lock() + defer p.lk.Unlock() + p.RecordsFailedToLoad++ +} + +func (p *ThreadLoadProgress) IncrementMissingRecord() { + p.lk.Lock() + defer p.lk.Unlock() + p.RecordsMissingLocally++ +} + +// Value returns the current progress. +func (p *ThreadLoadProgress) Value() ThreadLoadProgress { + p.lk.Lock() + defer p.lk.Unlock() + return ThreadLoadProgress{ + RecordsMissingLocally: p.RecordsMissingLocally, + RecordsLoaded: p.RecordsLoaded, + RecordsFailedToLoad: p.RecordsFailedToLoad, + } +} diff --git a/pkg/lib/core/core.go b/pkg/lib/core/core.go index 2711436ee..2af3dc883 100644 --- a/pkg/lib/core/core.go +++ b/pkg/lib/core/core.go @@ -187,12 +187,12 @@ func (a *Anytype) Device() string { return pk.Address() } -func (a *Anytype) Run() (err error) { +func (a *Anytype) Run(ctx context.Context) (err error) { if err = a.Start(); err != nil { return } - return a.EnsurePredefinedBlocks(context.TODO(), a.config.NewAccount) + return a.EnsurePredefinedBlocks(ctx, a.config.NewAccount) } func (a *Anytype) IsStarted() bool { diff --git a/pkg/lib/core/smartblock.go b/pkg/lib/core/smartblock.go index dc4d67190..94445321b 100644 --- a/pkg/lib/core/smartblock.go +++ b/pkg/lib/core/smartblock.go @@ -428,10 +428,33 @@ func (block *smartBlock) GetRecord(ctx context.Context, recordID string) (*Smart return nil, err } + ctxProgress, _ := ctx.Value(ThreadLoadProgressContextKey).(*ThreadLoadProgress) + if ctxProgress != nil { + cid, err := cid.Parse(rid) + if err != nil { + return nil, err + } + + b, err := block.node.ipfs.HasBlock(cid) + if err != nil { + return nil, err + } + + if !b { + ctxProgress.IncrementMissingRecord() + } + } + rec, err := block.node.threadService.Threads().GetRecord(ctx, block.thread.ID, rid) if err != nil { + if ctxProgress != nil { + ctxProgress.IncrementFailedRecords() + } return nil, err } + if ctxProgress != nil { + ctxProgress.IncrementLoadedRecords() + } return block.decodeRecord(ctx, rec, true) } diff --git a/pkg/lib/datastore/clientds/clientds.go b/pkg/lib/datastore/clientds/clientds.go index 00eda9aac..c60affd9b 100644 --- a/pkg/lib/datastore/clientds/clientds.go +++ b/pkg/lib/datastore/clientds/clientds.go @@ -176,7 +176,7 @@ func (r *clientds) Init(a *app.App) (err error) { return nil } -func (r *clientds) Run() error { +func (r *clientds) Run(context.Context) error { var err error litestoreOldPath := r.getRepoPath(liteOldDSDir) diff --git a/pkg/lib/gateway/gateway.go b/pkg/lib/gateway/gateway.go index 116b83090..207cf3e68 100644 --- a/pkg/lib/gateway/gateway.go +++ b/pkg/lib/gateway/gateway.go @@ -85,7 +85,7 @@ func (g *gateway) Name() string { return CName } -func (g *gateway) Run() error { +func (g *gateway) Run(context.Context) error { if g.isServerStarted { return fmt.Errorf("gateway already started") } diff --git a/pkg/lib/ipfs/ipfslite/lite.go b/pkg/lib/ipfs/ipfslite/lite.go index 71b943694..d0a78e29e 100644 --- a/pkg/lib/ipfs/ipfslite/lite.go +++ b/pkg/lib/ipfs/ipfslite/lite.go @@ -124,7 +124,7 @@ func (ln *liteNet) Init(a *app.App) (err error) { return nil } -func (ln *liteNet) Run() error { +func (ln *liteNet) Run(context.Context) error { peerDS, err := ln.ds.PeerstoreDS() if err != nil { return fmt.Errorf("peerDS: %s", err.Error()) diff --git a/pkg/lib/localstore/filestore/files.go b/pkg/lib/localstore/filestore/files.go index 5b5c9e395..4ced405d7 100644 --- a/pkg/lib/localstore/filestore/files.go +++ b/pkg/lib/localstore/filestore/files.go @@ -1,6 +1,7 @@ package filestore import ( + "context" "fmt" "github.com/anytypeio/go-anytype-middleware/app" "github.com/anytypeio/go-anytype-middleware/pkg/lib/datastore" @@ -103,7 +104,7 @@ func (ls *dsFileStore) Init(a *app.App) (err error) { return nil } -func (ls *dsFileStore) Run() (err error) { +func (ls *dsFileStore) Run(context.Context) (err error) { ls.ds, err = ls.dsIface.LocalstoreDS() return } diff --git a/pkg/lib/localstore/ftsearch/ftsearch.go b/pkg/lib/localstore/ftsearch/ftsearch.go index acc9b3a1e..b47571083 100644 --- a/pkg/lib/localstore/ftsearch/ftsearch.go +++ b/pkg/lib/localstore/ftsearch/ftsearch.go @@ -1,6 +1,7 @@ package ftsearch import ( + "context" "github.com/anytypeio/go-anytype-middleware/app" "github.com/anytypeio/go-anytype-middleware/core/wallet" "github.com/anytypeio/go-anytype-middleware/metrics" @@ -63,7 +64,7 @@ func (f *ftSearch) Name() (name string) { return CName } -func (f *ftSearch) Run() (err error) { +func (f *ftSearch) Run(context.Context) (err error) { f.index, err = bleve.Open(f.ftsPath) if err == bleve.ErrorIndexPathDoesNotExist || err == bleve.ErrorIndexMetaMissing { if f.index, err = bleve.New(f.ftsPath, f.makeMapping()); err != nil { diff --git a/pkg/lib/localstore/ftsearch/ftsearch_test.go b/pkg/lib/localstore/ftsearch/ftsearch_test.go index d2fbccf3f..2fb1a42ef 100644 --- a/pkg/lib/localstore/ftsearch/ftsearch_test.go +++ b/pkg/lib/localstore/ftsearch/ftsearch_test.go @@ -1,6 +1,7 @@ package ftsearch import ( + "context" "github.com/anytypeio/go-anytype-middleware/app/testapp" "github.com/anytypeio/go-anytype-middleware/core/wallet" "github.com/golang/mock/gomock" @@ -23,7 +24,7 @@ func newFixture(path string, t *testing.T) *fixture { With(wallet.NewWithRepoPathAndKeys(path, nil, nil)). With(ft) - require.NoError(t, ta.Start()) + require.NoError(t, ta.Start(context.Background())) return &fixture{ ft: ft, ta: ta, diff --git a/pkg/lib/localstore/objectstore/objects.go b/pkg/lib/localstore/objectstore/objects.go index 98a31cc5f..e2e086bed 100644 --- a/pkg/lib/localstore/objectstore/objects.go +++ b/pkg/lib/localstore/objectstore/objects.go @@ -1,6 +1,7 @@ package objectstore import ( + "context" "encoding/binary" "errors" "fmt" @@ -629,7 +630,7 @@ func (m *dsObjectStore) eraseLinks() (err error) { return nil } -func (m *dsObjectStore) Run() (err error) { +func (m *dsObjectStore) Run(context.Context) (err error) { m.ds, err = m.dsIface.LocalstoreDS() return } diff --git a/pkg/lib/localstore/objectstore/objects_test.go b/pkg/lib/localstore/objectstore/objects_test.go index 640255b49..31ce5d69e 100644 --- a/pkg/lib/localstore/objectstore/objects_test.go +++ b/pkg/lib/localstore/objectstore/objects_test.go @@ -1,6 +1,7 @@ package objectstore import ( + "context" "fmt" "github.com/anytypeio/go-anytype-middleware/pkg/lib/logging" "github.com/anytypeio/go-anytype-middleware/pkg/lib/schema" @@ -43,7 +44,7 @@ func TestDsObjectStore_UpdateLocalDetails(t *testing.T) { id, err := threads.ThreadCreateID(thread.AccessControlled, smartblock.SmartBlockTypePage) require.NoError(t, err) - err = app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ds).Start() + err = app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ds).Start(context.Background()) require.NoError(t, err) // bundle.RelationKeyLastOpenedDate is local relation (not stored in the changes tree) err = ds.CreateObject(id.String(), &types.Struct{ @@ -88,7 +89,7 @@ func TestDsObjectStore_IndexQueue(t *testing.T) { defer app.Close() ds := New() - err := app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ds).Start() + err := app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ds).Start(context.Background()) require.NoError(t, err) require.NoError(t, ds.AddToIndexQueue("one")) @@ -145,7 +146,7 @@ func TestDsObjectStore_Query(t *testing.T) { defer app.Close() ds := New() - err := app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ftsearch.New()).With(ds).Start() + err := app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ftsearch.New()).With(ds).Start(context.Background()) require.NoError(t, err) fts := app.MustComponent(ftsearch.CName).(ftsearch.FTSearch) @@ -262,7 +263,7 @@ func TestDsObjectStore_RelationsIndex(t *testing.T) { app := testapp.New() defer app.Close() ds := New() - err := app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ftsearch.New()).With(ds).Start() + err := app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ftsearch.New()).With(ds).Start(context.Background()) require.NoError(t, err) newDet := func(name, objtype string) *types.Struct { @@ -360,7 +361,7 @@ func Test_removeByPrefix(t *testing.T) { app := testapp.New() defer app.Close() ds := New() - err := app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ftsearch.New()).With(ds).Start() + err := app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ftsearch.New()).With(ds).Start(context.Background()) require.NoError(t, err) ds2 := ds.(*dsObjectStore) @@ -401,7 +402,7 @@ func Test_SearchRelationDistinct(t *testing.T) { app := testapp.New() defer app.Close() ds := New() - err := app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ftsearch.New()).With(ds).Start() + err := app.With(&config.DefaultConfig).With(wallet.NewWithRepoPathAndKeys(tmpDir, nil, nil)).With(clientds.New()).With(ftsearch.New()).With(ds).Start(context.Background()) require.NoError(t, err) id1 := getId() @@ -427,11 +428,11 @@ func Test_SearchRelationDistinct(t *testing.T) { }}, nil, "s1")) require.NoError(t, ds.CreateObject(id2, &types.Struct{Fields: map[string]*types.Value{ - "name": pbtypes.String("two"), - "type": pbtypes.StringList([]string{"_ota2"}), - "tag": pbtypes.StringList([]string{"tag1"}), - }, - }, &model.Relations{Relations: []*model.Relation{ + "name": pbtypes.String("two"), + "type": pbtypes.StringList([]string{"_ota2"}), + "tag": pbtypes.StringList([]string{"tag1"}), + }, + }, &model.Relations{Relations: []*model.Relation{ { Key: "rel1", Format: model.RelationFormat_status, @@ -467,8 +468,8 @@ func Test_SearchRelationDistinct(t *testing.T) { require.NoError(t, ds.CreateObject(id3, &types.Struct{Fields: map[string]*types.Value{ "name": pbtypes.String("three"), "type": pbtypes.StringList([]string{"_ota2"}), - "tag": pbtypes.StringList([]string{"tag1", "tag2", "tag3"}), - }}, nil, nil,"s3")) + "tag": pbtypes.StringList([]string{"tag1", "tag2", "tag3"}), + }}, nil, nil, "s3")) statusOpts, err := ds.RelationSearchDistinct("rel1", nil) require.NoError(t, err) @@ -481,4 +482,4 @@ func Test_SearchRelationDistinct(t *testing.T) { tagsOptsFilter, err := ds.RelationSearchDistinct("rel4", []*model.BlockContentDataviewFilter{{RelationKey: "name", Condition: 1, Value: pbtypes.String("three")}}) require.NoError(t, err) require.Len(t, tagsOptsFilter, 1) -} \ No newline at end of file +} diff --git a/pkg/lib/pin/service.go b/pkg/lib/pin/service.go index da2e66350..f767c0747 100644 --- a/pkg/lib/pin/service.go +++ b/pkg/lib/pin/service.go @@ -73,7 +73,7 @@ func (f *filePinService) Init(a *app.App) error { return nil } -func (f *filePinService) Run() error { +func (f *filePinService) Run(context.Context) error { if f.cafe != nil { go f.syncCafe() } else { diff --git a/pkg/lib/threads/service.go b/pkg/lib/threads/service.go index 6d7f5668c..db4f8e3bb 100644 --- a/pkg/lib/threads/service.go +++ b/pkg/lib/threads/service.go @@ -177,7 +177,7 @@ func (s *service) ObserveAccountStateUpdate(state *pb.AccountState) { s.threadQueue.UpdateSimultaneousRequestsLimit(int(state.Config.SimultaneousRequests)) } -func (s *service) Run() (err error) { +func (s *service) Run(context.Context) (err error) { s.logstoreDS, err = s.ds.LogstoreDS() if err != nil { return err diff --git a/util/builtinobjects/builtinobjects.go b/util/builtinobjects/builtinobjects.go index dff56eb54..67d4ee3b5 100644 --- a/util/builtinobjects/builtinobjects.go +++ b/util/builtinobjects/builtinobjects.go @@ -75,7 +75,7 @@ func (b *builtinObjects) Name() (name string) { return CName } -func (b *builtinObjects) Run() (err error) { +func (b *builtinObjects) Run(context.Context) (err error) { if !b.newAccount { // import only for new accounts return diff --git a/util/builtintemplate/builtintemplate.go b/util/builtintemplate/builtintemplate.go index 782ba92b8..e8a6f181a 100644 --- a/util/builtintemplate/builtintemplate.go +++ b/util/builtintemplate/builtintemplate.go @@ -3,6 +3,7 @@ package builtintemplate import ( "archive/zip" "bytes" + "context" "crypto/md5" _ "embed" "encoding/binary" @@ -64,7 +65,7 @@ func (b *builtinTemplate) Name() (name string) { return CName } -func (b *builtinTemplate) Run() (err error) { +func (b *builtinTemplate) Run(context.Context) (err error) { zr, err := zip.NewReader(bytes.NewReader(templatesZip), int64(len(templatesZip))) if err != nil { return diff --git a/util/builtintemplate/builtintemplate_test.go b/util/builtintemplate/builtintemplate_test.go index 0c149817a..9857165d4 100644 --- a/util/builtintemplate/builtintemplate_test.go +++ b/util/builtintemplate/builtintemplate_test.go @@ -1,6 +1,7 @@ package builtintemplate import ( + "context" "testing" "github.com/anytypeio/go-anytype-middleware/app/testapp" @@ -20,6 +21,6 @@ func Test_registerBuiltin(t *testing.T) { s.EXPECT().RegisterStaticSource(gomock.Any(), gomock.Any()).AnyTimes() a := testapp.New().With(s).With(New()) - require.NoError(t, a.Start()) + require.NoError(t, a.Start(context.Background())) defer a.Close() } diff --git a/util/testMock/anytype.go b/util/testMock/anytype.go index b9183ac31..658cb2707 100644 --- a/util/testMock/anytype.go +++ b/util/testMock/anytype.go @@ -4,6 +4,7 @@ package testMock import ( + "context" "github.com/anytypeio/go-anytype-middleware/app" "github.com/anytypeio/go-anytype-middleware/app/testapp" "github.com/anytypeio/go-anytype-middleware/pkg/lib/core" @@ -19,7 +20,7 @@ func RegisterMockAnytype(ctrl *gomock.Controller, ta *testapp.TestApp) *MockServ ms := NewMockService(ctrl) ms.EXPECT().Name().AnyTimes().Return(core.CName) ms.EXPECT().Init(gomock.Any()).AnyTimes() - ms.EXPECT().Run().AnyTimes() + ms.EXPECT().Run(context.Background()).AnyTimes() ms.EXPECT().Close().AnyTimes() ta.Register(ms) return ms @@ -29,7 +30,7 @@ func RegisterMockObjectStore(ctrl *gomock.Controller, ta App) *MockObjectStore { ms := NewMockObjectStore(ctrl) ms.EXPECT().Name().AnyTimes().Return(objectstore.CName) ms.EXPECT().Init(gomock.Any()).AnyTimes() - ms.EXPECT().Run().AnyTimes() + ms.EXPECT().Run(context.Background()).AnyTimes() ms.EXPECT().Close().AnyTimes() ta.Register(ms) return ms diff --git a/util/testMock/mockBuiltinTemplate/builtintemplate.go b/util/testMock/mockBuiltinTemplate/builtintemplate.go index b5f348e5f..39dfdb252 100644 --- a/util/testMock/mockBuiltinTemplate/builtintemplate.go +++ b/util/testMock/mockBuiltinTemplate/builtintemplate.go @@ -2,6 +2,7 @@ package mockBuiltinTemplate import ( + "context" "github.com/anytypeio/go-anytype-middleware/app/testapp" "github.com/anytypeio/go-anytype-middleware/util/builtintemplate" "github.com/golang/mock/gomock" @@ -11,7 +12,7 @@ func RegisterMockBuiltinTemplate(ctrl *gomock.Controller, ta *testapp.TestApp) * ms := NewMockBuiltinTemplate(ctrl) ms.EXPECT().Name().AnyTimes().Return(builtintemplate.CName) ms.EXPECT().Init(gomock.Any()).AnyTimes() - ms.EXPECT().Run().AnyTimes() + ms.EXPECT().Run(context.Background()).AnyTimes() ms.EXPECT().Close().AnyTimes() ta.Register(ms) return ms diff --git a/util/testMock/mockStatus/status.go b/util/testMock/mockStatus/status.go index 48cd1e61e..6d0287f0b 100644 --- a/util/testMock/mockStatus/status.go +++ b/util/testMock/mockStatus/status.go @@ -2,6 +2,7 @@ package mockStatus import ( + "context" "github.com/anytypeio/go-anytype-middleware/app/testapp" "github.com/anytypeio/go-anytype-middleware/core/status" "github.com/golang/mock/gomock" @@ -11,7 +12,7 @@ func RegisterMockStatus(ctrl *gomock.Controller, ta *testapp.TestApp) *MockServi ms := NewMockService(ctrl) ms.EXPECT().Name().AnyTimes().Return(status.CName) ms.EXPECT().Init(gomock.Any()).AnyTimes() - ms.EXPECT().Run().AnyTimes() + ms.EXPECT().Run(context.Background()).AnyTimes() ms.EXPECT().Close().AnyTimes() ta.Register(ms) return ms