From 6ed21a94d3c001fc6d8b63c37fa08fd32e2a9654 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 21 Apr 2023 17:00:35 +0200 Subject: [PATCH] move rpc prometheus config to metric service --- metric/drpc.go | 4 ++-- metric/metric.go | 33 +++++++++++++++++++++++++++++++++ net/rpc/server/drpcserver.go | 20 +------------------- 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/metric/drpc.go b/metric/drpc.go index 97c01f8a..b81ed863 100644 --- a/metric/drpc.go +++ b/metric/drpc.go @@ -6,12 +6,12 @@ import ( "time" ) -type PrometheusDRPC struct { +type prometheusDRPC struct { drpc.Handler SummaryVec *prometheus.SummaryVec } -func (ph *PrometheusDRPC) HandleRPC(stream drpc.Stream, rpc string) (err error) { +func (ph *prometheusDRPC) HandleRPC(stream drpc.Stream, rpc string) (err error) { st := time.Now() defer func() { ph.SummaryVec.WithLabelValues(rpc).Observe(time.Since(st).Seconds()) diff --git a/metric/metric.go b/metric/metric.go index f3ddfae6..d64e81da 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -3,32 +3,40 @@ package metric import ( "context" "github.com/anytypeio/any-sync/app" + "github.com/anytypeio/any-sync/app/logger" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" "net/http" + "storj.io/drpc" "time" ) const CName = "common.metric" +var log = logger.NewNamed(CName) + func New() Metric { return new(metric) } type Metric interface { Registry() *prometheus.Registry + WrapDRPCHandler(h drpc.Handler) drpc.Handler app.ComponentRunnable } type metric struct { registry *prometheus.Registry + rpcLog logger.CtxLogger config Config } func (m *metric) Init(a *app.App) (err error) { m.registry = prometheus.NewRegistry() m.config = a.MustComponent("config").(configSource).GetMetric() + m.rpcLog = logger.NewNamed("rpcLog") return nil } @@ -61,6 +69,31 @@ func (m *metric) Registry() *prometheus.Registry { return m.registry } +func (m *metric) WrapDRPCHandler(h drpc.Handler) drpc.Handler { + if m == nil { + return h + } + histVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "drpc", + Subsystem: "server", + Name: "duration_seconds", + Objectives: map[float64]float64{ + 0.5: 0.5, + 0.85: 0.01, + 0.95: 0.0005, + 0.99: 0.0001, + }, + }, []string{"rpc"}) + if err := m.Registry().Register(histVec); err != nil { + log.Warn("can't register prometheus drpc metric", zap.Error(err)) + return h + } + return &prometheusDRPC{ + Handler: h, + SummaryVec: histVec, + } +} + func (m *metric) Close(ctx context.Context) (err error) { return } diff --git a/net/rpc/server/drpcserver.go b/net/rpc/server/drpcserver.go index 4d9d54ac..e97ea0d3 100644 --- a/net/rpc/server/drpcserver.go +++ b/net/rpc/server/drpcserver.go @@ -8,7 +8,6 @@ import ( anyNet "github.com/anytypeio/any-sync/net" "github.com/anytypeio/any-sync/net/secureservice" "github.com/libp2p/go-libp2p/core/sec" - "github.com/prometheus/client_golang/prometheus" "net" "storj.io/drpc" "time" @@ -46,29 +45,12 @@ func (s *drpcServer) Name() (name string) { } func (s *drpcServer) Run(ctx context.Context) (err error) { - histVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: "drpc", - Subsystem: "server", - Name: "duration_seconds", - Objectives: map[float64]float64{ - 0.5: 0.5, - 0.85: 0.01, - 0.95: 0.0005, - 0.99: 0.0001, - }, - }, []string{"rpc"}) - if err = s.metric.Registry().Register(histVec); err != nil { - return - } params := Params{ BufferSizeMb: s.config.Stream.MaxMsgSizeMb, TimeoutMillis: s.config.Stream.TimeoutMilliseconds, ListenAddrs: s.config.Server.ListenAddrs, Wrapper: func(handler drpc.Handler) drpc.Handler { - return &metric.PrometheusDRPC{ - Handler: handler, - SummaryVec: histVec, - } + return s.metric.WrapDRPCHandler(handler) }, Handshake: func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)