1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-09 09:35:00 +09:00

Merge pull request #935 from anytypeio/new-threads_processing-limiter

New threads processing limiter
This commit is contained in:
Roman Khafizianov 2021-03-04 17:01:23 +03:00 committed by GitHub
commit d65a8f5045
Signed by: github
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 127 additions and 69 deletions

View file

@ -15,8 +15,7 @@
"editable": true,
"gnetId": 9186,
"graphTooltip": 0,
"id": 3,
"iteration": 1614439793691,
"iteration": 1614862047771,
"links": [],
"panels": [
{
@ -842,7 +841,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:342",
"format": "reqps",
"label": null,
"logBase": 1,
"max": null,
@ -850,7 +850,8 @@
"show": true
},
{
"format": "Bps",
"$$hashKey": "object:343",
"format": "reqps",
"label": null,
"logBase": 1,
"max": null,
@ -1014,7 +1015,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg(rate(grpc_client_handling_seconds_sum{job=\"$job\",grpc_service=\"net.pb.Service\",grpc_type=\"unary\"}[$interval])) by (grpc_method)",
"expr": "histogram_quantile(0.5, sum(rate(grpc_client_handling_seconds_bucket{job=\"$job\",grpc_service=\"net.pb.Service\",grpc_type=\"unary\"}[$interval])) by (grpc_method,le))",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
@ -1042,7 +1043,8 @@
},
"yaxes": [
{
"format": "ms",
"$$hashKey": "object:284",
"format": "s",
"label": null,
"logBase": 1,
"max": null,
@ -1050,6 +1052,7 @@
"show": true
},
{
"$$hashKey": "object:285",
"format": "none",
"label": null,
"logBase": 1,
@ -1171,7 +1174,8 @@
},
"yaxes": [
{
"format": "decbytes",
"$$hashKey": "object:94",
"format": "reqps",
"label": null,
"logBase": 1,
"max": null,
@ -1179,12 +1183,13 @@
"show": true
},
{
"$$hashKey": "object:95",
"format": "Bps",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
"show": false
}
],
"yaxis": {
@ -1277,7 +1282,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:198",
"format": "percentunit",
"label": null,
"logBase": 1,
"max": null,
@ -1285,6 +1291,7 @@
"show": true
},
{
"$$hashKey": "object:199",
"format": "percentunit",
"label": null,
"logBase": 1,
@ -1383,7 +1390,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:255",
"format": "s",
"label": null,
"logBase": 1,
"max": null,
@ -1391,6 +1399,7 @@
"show": true
},
{
"$$hashKey": "object:256",
"format": "short",
"label": null,
"logBase": 1,
@ -1489,7 +1498,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:313",
"format": "percent",
"label": null,
"logBase": 1,
"max": null,
@ -1497,7 +1507,8 @@
"show": true
},
{
"format": "short",
"$$hashKey": "object:314",
"format": "percent",
"label": null,
"logBase": 1,
"max": null,
@ -1740,5 +1751,5 @@
"timezone": "",
"title": "Threads gRPC client",
"uid": "7_VAvbQac",
"version": 3
"version": 7
}

View file

@ -15,8 +15,7 @@
"editable": true,
"gnetId": 9186,
"graphTooltip": 0,
"id": 3,
"iteration": 1614361132065,
"iteration": 1614862047771,
"links": [],
"panels": [
{
@ -816,6 +815,7 @@
{
"expr": "sum(rate(grpc_server_handled_total{job=\"$job\",grpc_service=\"net.pb.Service\",grpc_type=\"unary\"}[$interval])) by (grpc_code)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{grpc_code}}",
"refId": "A"
@ -841,7 +841,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:342",
"format": "reqps",
"label": null,
"logBase": 1,
"max": null,
@ -849,7 +850,8 @@
"show": true
},
{
"format": "Bps",
"$$hashKey": "object:343",
"format": "reqps",
"label": null,
"logBase": 1,
"max": null,
@ -913,10 +915,11 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(irate(grpc_server_started_total{job=\"$job\",grpc_service=\"net.pb.Service\"}[$interval])) by (grpc_service)",
"expr": "sum(irate(grpc_server_started_total{job=\"$job\",grpc_service=\"net.pb.Service\"}[$interval])) by (grpc_method)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{grpc_service}}",
"legendFormat": "{{grpc_method}}",
"refId": "A"
}
],
@ -1012,8 +1015,9 @@
"steppedLine": false,
"targets": [
{
"expr": "avg(rate(grpc_server_handling_seconds_sum{job=\"$job\",grpc_service=\"net.pb.Service\",grpc_type=\"unary\"}[$interval])) by (grpc_method)",
"expr": "histogram_quantile(0.5, sum(rate(grpc_server_handling_seconds_bucket{job=\"$job\",grpc_service=\"net.pb.Service\",grpc_type=\"unary\"}[$interval])) by (grpc_method,le))",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{grpc_service}}",
"refId": "A"
@ -1023,7 +1027,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Average handling time",
"title": "Average response time",
"tooltip": {
"shared": true,
"sort": 0,
@ -1039,7 +1043,8 @@
},
"yaxes": [
{
"format": "ms",
"$$hashKey": "object:284",
"format": "s",
"label": null,
"logBase": 1,
"max": null,
@ -1047,6 +1052,7 @@
"show": true
},
{
"$$hashKey": "object:285",
"format": "none",
"label": null,
"logBase": 1,
@ -1168,7 +1174,8 @@
},
"yaxes": [
{
"format": "decbytes",
"$$hashKey": "object:94",
"format": "reqps",
"label": null,
"logBase": 1,
"max": null,
@ -1176,12 +1183,13 @@
"show": true
},
{
"$$hashKey": "object:95",
"format": "Bps",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
"show": false
}
],
"yaxis": {
@ -1274,7 +1282,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:198",
"format": "percentunit",
"label": null,
"logBase": 1,
"max": null,
@ -1282,6 +1291,7 @@
"show": true
},
{
"$$hashKey": "object:199",
"format": "percentunit",
"label": null,
"logBase": 1,
@ -1380,7 +1390,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:255",
"format": "s",
"label": null,
"logBase": 1,
"max": null,
@ -1388,6 +1399,7 @@
"show": true
},
{
"$$hashKey": "object:256",
"format": "short",
"label": null,
"logBase": 1,
@ -1486,7 +1498,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:313",
"format": "percent",
"label": null,
"logBase": 1,
"max": null,
@ -1494,7 +1507,8 @@
"show": true
},
{
"format": "short",
"$$hashKey": "object:314",
"format": "percent",
"label": null,
"logBase": 1,
"max": null,

View file

@ -15,8 +15,7 @@
"editable": true,
"gnetId": 9186,
"graphTooltip": 0,
"id": 3,
"iteration": 1614361132065,
"iteration": 1614862047771,
"links": [],
"panels": [
{
@ -816,6 +815,7 @@
{
"expr": "sum(rate(grpc_server_handled_total{job=\"$job\",grpc_service=\"anytype.ClientCommands\",grpc_type=\"unary\"}[$interval])) by (grpc_code)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{grpc_code}}",
"refId": "A"
@ -841,7 +841,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:342",
"format": "reqps",
"label": null,
"logBase": 1,
"max": null,
@ -849,7 +850,8 @@
"show": true
},
{
"format": "Bps",
"$$hashKey": "object:343",
"format": "reqps",
"label": null,
"logBase": 1,
"max": null,
@ -913,10 +915,11 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(irate(grpc_server_started_total{job=\"$job\",grpc_service=\"anytype.ClientCommands\"}[$interval])) by (grpc_service)",
"expr": "sum(irate(grpc_server_started_total{job=\"$job\",grpc_service=\"anytype.ClientCommands\"}[$interval])) by (grpc_method)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{grpc_service}}",
"legendFormat": "{{grpc_method}}",
"refId": "A"
}
],
@ -1012,8 +1015,9 @@
"steppedLine": false,
"targets": [
{
"expr": "avg(rate(grpc_server_handling_seconds_sum{job=\"$job\",grpc_service=\"anytype.ClientCommands\",grpc_type=\"unary\"}[$interval])) by (grpc_method)",
"expr": "histogram_quantile(0.5, sum(rate(grpc_server_handling_seconds_bucket{job=\"$job\",grpc_service=\"anytype.ClientCommands\",grpc_type=\"unary\"}[$interval])) by (grpc_method,le))",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{grpc_service}}",
"refId": "A"
@ -1023,7 +1027,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Average handling time",
"title": "Average response time",
"tooltip": {
"shared": true,
"sort": 0,
@ -1039,7 +1043,8 @@
},
"yaxes": [
{
"format": "ms",
"$$hashKey": "object:284",
"format": "s",
"label": null,
"logBase": 1,
"max": null,
@ -1047,6 +1052,7 @@
"show": true
},
{
"$$hashKey": "object:285",
"format": "none",
"label": null,
"logBase": 1,
@ -1168,7 +1174,8 @@
},
"yaxes": [
{
"format": "decbytes",
"$$hashKey": "object:94",
"format": "reqps",
"label": null,
"logBase": 1,
"max": null,
@ -1176,12 +1183,13 @@
"show": true
},
{
"$$hashKey": "object:95",
"format": "Bps",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
"show": false
}
],
"yaxis": {
@ -1274,7 +1282,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:198",
"format": "percentunit",
"label": null,
"logBase": 1,
"max": null,
@ -1282,6 +1291,7 @@
"show": true
},
{
"$$hashKey": "object:199",
"format": "percentunit",
"label": null,
"logBase": 1,
@ -1380,7 +1390,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:255",
"format": "s",
"label": null,
"logBase": 1,
"max": null,
@ -1388,6 +1399,7 @@
"show": true
},
{
"$$hashKey": "object:256",
"format": "short",
"label": null,
"logBase": 1,
@ -1486,7 +1498,8 @@
},
"yaxes": [
{
"format": "short",
"$$hashKey": "object:313",
"format": "percent",
"label": null,
"logBase": 1,
"max": null,
@ -1494,7 +1507,8 @@
"show": true
},
{
"format": "short",
"$$hashKey": "object:314",
"format": "percent",
"label": null,
"logBase": 1,
"max": null,

View file

@ -117,7 +117,10 @@ func (s *service) threadsDbListen() error {
metrics.ExternalThreadReceivedCounter.Inc()
go func() {
s.processNewExternalThreadUntilSuccess(tid, ti)
if s.processNewExternalThreadUntilSuccess(tid, ti) != nil {
log.With("thread", tid.String()).Error("processNewExternalThreadUntilSuccess failed: %s", err.Error())
return
}
ch := s.getNewThreadChan()
if ch != nil {
@ -137,7 +140,7 @@ func (s *service) threadsDbListen() error {
// processNewExternalThreadUntilSuccess tries to add the new thread from remote peer until success
// supposed to be run in goroutine
func (s *service) processNewExternalThreadUntilSuccess(tid thread.ID, ti threadInfo) {
func (s *service) processNewExternalThreadUntilSuccess(tid thread.ID, ti threadInfo) error {
log := log.With("thread", tid.String())
log.With("threadAddrs", ti.Addrs).Info("got new thread")
start := time.Now()
@ -145,18 +148,21 @@ func (s *service) processNewExternalThreadUntilSuccess(tid thread.ID, ti threadI
for {
metrics.ExternalThreadHandlingAttempts.Inc()
attempt++
<-s.newThreadProcessingLimiter
err := s.processNewExternalThread(tid, ti)
if err != nil {
s.newThreadProcessingLimiter <- struct{}{}
log.Errorf("processNewExternalThread failed after %d attempt: %s", attempt, err.Error())
} else {
s.newThreadProcessingLimiter <- struct{}{}
metrics.ServedThreads.Inc()
metrics.ExternalThreadHandlingDuration.Observe(time.Since(start).Seconds())
log.Debugf("processNewExternalThread succeed after %d attempt", attempt)
return
return nil
}
select {
case <-s.ctx.Done():
return
return context.Canceled
case <-time.After(time.Duration(5*attempt) * time.Second):
continue
}

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/anytypeio/go-anytype-middleware/metrics"
"strings"
"time"
@ -68,15 +69,27 @@ func (s *service) addMissingThreadsFromCollection() error {
}
if _, err = s.t.GetThread(context.Background(), tid); err != nil && errors.Is(err, logstore.ErrThreadNotFound) {
metrics.ExternalThreadReceivedCounter.Add(1)
missingThreadsAdded++
go func() {
s.processNewExternalThreadUntilSuccess(tid, ti)
if s.processNewExternalThreadUntilSuccess(tid, ti) != nil {
log.With("thread", tid.String()).Error("processNewExternalThreadUntilSuccess failed: %s", err.Error())
return
}
ch := s.getNewThreadChan()
if ch != nil {
select {
case <-s.ctx.Done():
case ch <- tid.String():
}
}
}()
}
}
if missingThreadsAdded > 0 {
log.Warnf("addMissingThreadsFromCollection: adding %d missing threads in background...", missingThreadsAdded)
log.Warnf("addMissingThreadsFromCollection: processing %d missing threads in background...", missingThreadsAdded)
}
return nil
}

View file

@ -23,18 +23,18 @@ import (
var log = logging.Logger("anytype-threads")
const simultaneousRequests = 10
const simultaneousRequests = 20
type service struct {
t net2.NetBoostrapper
db *db.DB
threadsCollection *db.Collection
threadsGetter ThreadsGetter
device wallet.Keypair
account wallet.Keypair
repoRootPath string
newHeadProcessor func(id thread.ID) error
newThreadChan chan<- string
t net2.NetBoostrapper
db *db.DB
threadsCollection *db.Collection
threadsGetter ThreadsGetter
device wallet.Keypair
account wallet.Keypair
repoRootPath string
newHeadProcessor func(id thread.ID) error
newThreadChan chan<- string
newThreadProcessingLimiter chan struct{}
replicatorAddr ma.Multiaddr
@ -51,16 +51,16 @@ func New(threadsAPI net2.NetBoostrapper, threadsGetter ThreadsGetter, repoRootPa
}
return &service{
t: threadsAPI,
threadsGetter: threadsGetter,
device: deviceKeypair,
repoRootPath: repoRootPath,
account: accountKeypair,
newHeadProcessor: newHeadProcessor,
replicatorAddr: replicatorAddr,
newThreadProcessingLimiter: limiter,
ctx: ctx,
ctxCancel: cancel,
t: threadsAPI,
threadsGetter: threadsGetter,
device: deviceKeypair,
repoRootPath: repoRootPath,
account: accountKeypair,
newHeadProcessor: newHeadProcessor,
replicatorAddr: replicatorAddr,
newThreadProcessingLimiter: limiter,
ctx: ctx,
ctxCancel: cancel,
}
}