1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-07 21:47:02 +09:00
any-sync/metric/metric.go
2024-08-15 08:28:51 +02:00

150 lines
3.4 KiB
Go

package metric
import (
"context"
"net/http"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
)
const CName = "common.metric"
var log = logger.NewNamed(CName)
func New() Metric {
return &metric{
lastCachedTimeout: time.Second * 10,
}
}
type Metric interface {
Registry() *prometheus.Registry
WrapDRPCHandler(h drpc.Handler) drpc.Handler
RequestLog(ctx context.Context, rpc string, fields ...zap.Field)
RegisterSyncMetric(spaceId string, syncMetric SyncMetric)
UnregisterSyncMetric(spaceId string)
RegisterStreamPoolSyncMetric(mtr StreamPoolMetric)
UnregisterStreamPoolSyncMetric()
app.ComponentRunnable
}
type metric struct {
registry *prometheus.Registry
rpcLog logger.CtxLogger
config Config
a *app.App
appField zap.Field
mx sync.Mutex
syncMetrics map[string]SyncMetric
streamPoolMetric StreamPoolMetric
lastCachedState SyncMetricState
lastCachedDate time.Time
lastCachedTimeout time.Duration
}
func (m *metric) RegisterStreamPoolSyncMetric(mtr StreamPoolMetric) {
m.mx.Lock()
defer m.mx.Unlock()
m.streamPoolMetric = mtr
}
func (m *metric) UnregisterStreamPoolSyncMetric() {
m.mx.Lock()
defer m.mx.Unlock()
m.streamPoolMetric = nil
}
func (m *metric) RegisterSyncMetric(spaceId string, syncMetric SyncMetric) {
m.mx.Lock()
defer m.mx.Unlock()
m.syncMetrics[spaceId] = syncMetric
}
func (m *metric) UnregisterSyncMetric(spaceId string) {
m.mx.Lock()
defer m.mx.Unlock()
delete(m.syncMetrics, spaceId)
}
func (m *metric) Init(a *app.App) (err error) {
m.a = a
m.syncMetrics = make(map[string]SyncMetric)
m.registry = prometheus.NewRegistry()
m.config = a.MustComponent("config").(configSource).GetMetric()
m.rpcLog = logger.NewNamed("rpcLog")
m.appField = App(a.Version())
return nil
}
func (m *metric) Name() string {
return CName
}
func (m *metric) Run(ctx context.Context) (err error) {
if err = m.registry.Register(collectors.NewBuildInfoCollector()); err != nil {
return err
}
if err = m.registry.Register(collectors.NewGoCollector()); err != nil {
return err
}
if err = m.registry.Register(newVersionsCollector(m.a)); err != nil {
return
}
if err = m.registerSyncMetrics(); err != nil {
return
}
if m.config.Addr != "" {
var errCh = make(chan error)
http.Handle("/metrics", promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}))
go func() {
errCh <- http.ListenAndServe(m.config.Addr, nil)
}()
select {
case err = <-errCh:
case <-time.After(time.Second / 5):
}
}
return
}
func (m *metric) Registry() *prometheus.Registry {
return m.registry
}
func (m *metric) WrapDRPCHandler(h drpc.Handler) drpc.Handler {
if m == nil {
return h
}
histVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "drpc",
Subsystem: "server",
Name: "duration_seconds",
Objectives: map[float64]float64{
0.5: 0.5,
0.85: 0.01,
0.95: 0.0005,
0.99: 0.0001,
},
}, []string{"rpc"})
if err := m.Registry().Register(histVec); err != nil {
log.Warn("can't register prometheus drpc metric", zap.Error(err))
return h
}
return &prometheusDRPC{
Handler: h,
SummaryVec: histVec,
}
}
func (m *metric) Close(ctx context.Context) (err error) {
return
}