mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-09 09:35:03 +09:00
move rpc prometheus config to metric service
This commit is contained in:
parent
01abaf6f81
commit
6ed21a94d3
3 changed files with 36 additions and 21 deletions
|
@ -6,12 +6,12 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
type PrometheusDRPC struct {
|
||||
type prometheusDRPC struct {
|
||||
drpc.Handler
|
||||
SummaryVec *prometheus.SummaryVec
|
||||
}
|
||||
|
||||
func (ph *PrometheusDRPC) HandleRPC(stream drpc.Stream, rpc string) (err error) {
|
||||
func (ph *prometheusDRPC) HandleRPC(stream drpc.Stream, rpc string) (err error) {
|
||||
st := time.Now()
|
||||
defer func() {
|
||||
ph.SummaryVec.WithLabelValues(rpc).Observe(time.Since(st).Seconds())
|
||||
|
|
|
@ -3,32 +3,40 @@ package metric
|
|||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/app"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.uber.org/zap"
|
||||
"net/http"
|
||||
"storj.io/drpc"
|
||||
"time"
|
||||
)
|
||||
|
||||
const CName = "common.metric"
|
||||
|
||||
var log = logger.NewNamed(CName)
|
||||
|
||||
func New() Metric {
|
||||
return new(metric)
|
||||
}
|
||||
|
||||
type Metric interface {
|
||||
Registry() *prometheus.Registry
|
||||
WrapDRPCHandler(h drpc.Handler) drpc.Handler
|
||||
app.ComponentRunnable
|
||||
}
|
||||
|
||||
type metric struct {
|
||||
registry *prometheus.Registry
|
||||
rpcLog logger.CtxLogger
|
||||
config Config
|
||||
}
|
||||
|
||||
func (m *metric) Init(a *app.App) (err error) {
|
||||
m.registry = prometheus.NewRegistry()
|
||||
m.config = a.MustComponent("config").(configSource).GetMetric()
|
||||
m.rpcLog = logger.NewNamed("rpcLog")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -61,6 +69,31 @@ func (m *metric) Registry() *prometheus.Registry {
|
|||
return m.registry
|
||||
}
|
||||
|
||||
func (m *metric) WrapDRPCHandler(h drpc.Handler) drpc.Handler {
|
||||
if m == nil {
|
||||
return h
|
||||
}
|
||||
histVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
||||
Namespace: "drpc",
|
||||
Subsystem: "server",
|
||||
Name: "duration_seconds",
|
||||
Objectives: map[float64]float64{
|
||||
0.5: 0.5,
|
||||
0.85: 0.01,
|
||||
0.95: 0.0005,
|
||||
0.99: 0.0001,
|
||||
},
|
||||
}, []string{"rpc"})
|
||||
if err := m.Registry().Register(histVec); err != nil {
|
||||
log.Warn("can't register prometheus drpc metric", zap.Error(err))
|
||||
return h
|
||||
}
|
||||
return &prometheusDRPC{
|
||||
Handler: h,
|
||||
SummaryVec: histVec,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *metric) Close(ctx context.Context) (err error) {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
anyNet "github.com/anytypeio/any-sync/net"
|
||||
"github.com/anytypeio/any-sync/net/secureservice"
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"net"
|
||||
"storj.io/drpc"
|
||||
"time"
|
||||
|
@ -46,29 +45,12 @@ func (s *drpcServer) Name() (name string) {
|
|||
}
|
||||
|
||||
func (s *drpcServer) Run(ctx context.Context) (err error) {
|
||||
histVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
||||
Namespace: "drpc",
|
||||
Subsystem: "server",
|
||||
Name: "duration_seconds",
|
||||
Objectives: map[float64]float64{
|
||||
0.5: 0.5,
|
||||
0.85: 0.01,
|
||||
0.95: 0.0005,
|
||||
0.99: 0.0001,
|
||||
},
|
||||
}, []string{"rpc"})
|
||||
if err = s.metric.Registry().Register(histVec); err != nil {
|
||||
return
|
||||
}
|
||||
params := Params{
|
||||
BufferSizeMb: s.config.Stream.MaxMsgSizeMb,
|
||||
TimeoutMillis: s.config.Stream.TimeoutMilliseconds,
|
||||
ListenAddrs: s.config.Server.ListenAddrs,
|
||||
Wrapper: func(handler drpc.Handler) drpc.Handler {
|
||||
return &metric.PrometheusDRPC{
|
||||
Handler: handler,
|
||||
SummaryVec: histVec,
|
||||
}
|
||||
return s.metric.WrapDRPCHandler(handler)
|
||||
},
|
||||
Handshake: func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue