mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
150 lines
3.4 KiB
Go
150 lines
3.4 KiB
Go
package metric
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/collectors"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"go.uber.org/zap"
|
|
"storj.io/drpc"
|
|
|
|
"github.com/anyproto/any-sync/app"
|
|
"github.com/anyproto/any-sync/app/logger"
|
|
)
|
|
|
|
const CName = "common.metric"
|
|
|
|
var log = logger.NewNamed(CName)
|
|
|
|
func New() Metric {
|
|
return &metric{
|
|
lastCachedTimeout: time.Second * 10,
|
|
}
|
|
}
|
|
|
|
type Metric interface {
|
|
Registry() *prometheus.Registry
|
|
WrapDRPCHandler(h drpc.Handler) drpc.Handler
|
|
RequestLog(ctx context.Context, rpc string, fields ...zap.Field)
|
|
RegisterSyncMetric(spaceId string, syncMetric SyncMetric)
|
|
UnregisterSyncMetric(spaceId string)
|
|
RegisterStreamPoolSyncMetric(mtr StreamPoolMetric)
|
|
UnregisterStreamPoolSyncMetric()
|
|
app.ComponentRunnable
|
|
}
|
|
|
|
type metric struct {
|
|
registry *prometheus.Registry
|
|
rpcLog logger.CtxLogger
|
|
config Config
|
|
a *app.App
|
|
appField zap.Field
|
|
mx sync.Mutex
|
|
syncMetrics map[string]SyncMetric
|
|
streamPoolMetric StreamPoolMetric
|
|
lastCachedState SyncMetricState
|
|
lastCachedDate time.Time
|
|
lastCachedTimeout time.Duration
|
|
}
|
|
|
|
func (m *metric) RegisterStreamPoolSyncMetric(mtr StreamPoolMetric) {
|
|
m.mx.Lock()
|
|
defer m.mx.Unlock()
|
|
m.streamPoolMetric = mtr
|
|
}
|
|
|
|
func (m *metric) UnregisterStreamPoolSyncMetric() {
|
|
m.mx.Lock()
|
|
defer m.mx.Unlock()
|
|
m.streamPoolMetric = nil
|
|
}
|
|
|
|
func (m *metric) RegisterSyncMetric(spaceId string, syncMetric SyncMetric) {
|
|
m.mx.Lock()
|
|
defer m.mx.Unlock()
|
|
m.syncMetrics[spaceId] = syncMetric
|
|
}
|
|
|
|
func (m *metric) UnregisterSyncMetric(spaceId string) {
|
|
m.mx.Lock()
|
|
defer m.mx.Unlock()
|
|
delete(m.syncMetrics, spaceId)
|
|
}
|
|
|
|
func (m *metric) Init(a *app.App) (err error) {
|
|
m.a = a
|
|
m.syncMetrics = make(map[string]SyncMetric)
|
|
m.registry = prometheus.NewRegistry()
|
|
m.config = a.MustComponent("config").(configSource).GetMetric()
|
|
m.rpcLog = logger.NewNamed("rpcLog")
|
|
m.appField = App(a.Version())
|
|
return nil
|
|
}
|
|
|
|
func (m *metric) Name() string {
|
|
return CName
|
|
}
|
|
|
|
func (m *metric) Run(ctx context.Context) (err error) {
|
|
if err = m.registry.Register(collectors.NewBuildInfoCollector()); err != nil {
|
|
return err
|
|
}
|
|
if err = m.registry.Register(collectors.NewGoCollector()); err != nil {
|
|
return err
|
|
}
|
|
if err = m.registry.Register(newVersionsCollector(m.a)); err != nil {
|
|
return
|
|
}
|
|
if err = m.registerSyncMetrics(); err != nil {
|
|
return
|
|
}
|
|
if m.config.Addr != "" {
|
|
var errCh = make(chan error)
|
|
http.Handle("/metrics", promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}))
|
|
go func() {
|
|
errCh <- http.ListenAndServe(m.config.Addr, nil)
|
|
}()
|
|
select {
|
|
case err = <-errCh:
|
|
case <-time.After(time.Second / 5):
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (m *metric) Registry() *prometheus.Registry {
|
|
return m.registry
|
|
}
|
|
|
|
func (m *metric) WrapDRPCHandler(h drpc.Handler) drpc.Handler {
|
|
if m == nil {
|
|
return h
|
|
}
|
|
histVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
|
Namespace: "drpc",
|
|
Subsystem: "server",
|
|
Name: "duration_seconds",
|
|
Objectives: map[float64]float64{
|
|
0.5: 0.5,
|
|
0.85: 0.01,
|
|
0.95: 0.0005,
|
|
0.99: 0.0001,
|
|
},
|
|
}, []string{"rpc"})
|
|
if err := m.Registry().Register(histVec); err != nil {
|
|
log.Warn("can't register prometheus drpc metric", zap.Error(err))
|
|
return h
|
|
}
|
|
return &prometheusDRPC{
|
|
Handler: h,
|
|
SummaryVec: histVec,
|
|
}
|
|
}
|
|
|
|
func (m *metric) Close(ctx context.Context) (err error) {
|
|
return
|
|
}
|