diff --git a/commonspace/sync/objectsync/objectmessages/headupdate.go b/commonspace/sync/objectsync/objectmessages/headupdate.go index 60b014ee..92d07092 100644 --- a/commonspace/sync/objectsync/objectmessages/headupdate.go +++ b/commonspace/sync/objectsync/objectmessages/headupdate.go @@ -62,6 +62,10 @@ func (h *HeadUpdate) MsgSize() uint64 { return byteSize + uint64(len(h.Meta.PeerId)) + uint64(len(h.Meta.ObjectId)) + uint64(len(h.Meta.SpaceId)) } +func (h *HeadUpdate) Size() int { + return int(h.MsgSize()) +} + func (h *HeadUpdate) SetPeerId(peerId string) { h.Meta.PeerId = peerId } diff --git a/commonspace/sync/sync.go b/commonspace/sync/sync.go index bda37c1c..a111e71f 100644 --- a/commonspace/sync/sync.go +++ b/commonspace/sync/sync.go @@ -93,7 +93,7 @@ func (s *syncService) Run(ctx context.Context) (err error) { func (s *syncService) Close(ctx context.Context) (err error) { err = s.receiveQueue.Close() if s.commonMetric != nil { - s.commonMetric.UnregisterSyncMetric(s.spaceId, s.metric) + s.commonMetric.UnregisterSyncMetric(s.spaceId) } s.manager.Close() return diff --git a/metric/metric.go b/metric/metric.go index d13647b4..cc798049 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -31,7 +31,9 @@ type Metric interface { WrapDRPCHandler(h drpc.Handler) drpc.Handler RequestLog(ctx context.Context, rpc string, fields ...zap.Field) RegisterSyncMetric(spaceId string, syncMetric SyncMetric) - UnregisterSyncMetric(spaceId string, syncMetric SyncMetric) + UnregisterSyncMetric(spaceId string) + RegisterStreamPoolSyncMetric(mtr StreamPoolMetric) + UnregisterStreamPoolSyncMetric() app.ComponentRunnable } @@ -43,18 +45,31 @@ type metric struct { appField zap.Field mx sync.Mutex syncMetrics map[string]SyncMetric + streamPoolMetric StreamPoolMetric lastCachedState SyncMetricState lastCachedDate time.Time lastCachedTimeout time.Duration } +func (m *metric) RegisterStreamPoolSyncMetric(mtr StreamPoolMetric) { + m.mx.Lock() + defer m.mx.Unlock() + m.streamPoolMetric = mtr +} + +func (m *metric) UnregisterStreamPoolSyncMetric() { + m.mx.Lock() + defer m.mx.Unlock() + m.streamPoolMetric = nil +} + func (m *metric) RegisterSyncMetric(spaceId string, syncMetric SyncMetric) { m.mx.Lock() defer m.mx.Unlock() m.syncMetrics[spaceId] = syncMetric } -func (m *metric) UnregisterSyncMetric(spaceId string, syncMetric SyncMetric) { +func (m *metric) UnregisterSyncMetric(spaceId string) { m.mx.Lock() defer m.mx.Unlock() delete(m.syncMetrics, spaceId) diff --git a/metric/syncmetric.go b/metric/syncmetric.go index 69d25283..d4b5e2ed 100644 --- a/metric/syncmetric.go +++ b/metric/syncmetric.go @@ -10,6 +10,10 @@ type SyncMetric interface { SyncMetricState() SyncMetricState } +type StreamPoolMetric interface { + OutgoingMsg() (count uint32, size uint64) +} + type SyncMetricState struct { IncomingMsgCount uint32 IncomingMsgSize uint64 @@ -28,13 +32,11 @@ type SyncMetricState struct { func (st *SyncMetricState) Append(other SyncMetricState) { st.IncomingMsgSize += other.IncomingMsgSize - st.OutgoingMsgSize += other.OutgoingMsgSize st.IncomingReqSize += other.IncomingReqSize st.OutgoingReqSize += other.OutgoingReqSize st.ReceivedRespSize += other.ReceivedRespSize st.SentRespSize += other.SentRespSize st.IncomingMsgCount += other.IncomingMsgCount - st.OutgoingMsgCount += other.OutgoingMsgCount st.IncomingReqCount += other.IncomingReqCount st.OutgoingReqCount += other.OutgoingReqCount st.ReceivedRespCount += other.ReceivedRespCount @@ -58,6 +60,9 @@ func (m *metric) getLastCached() SyncMetricState { for _, mtr := range allMetrics { lastCached.Append(mtr.SyncMetricState()) } + if m.streamPoolMetric != nil { + lastCached.OutgoingMsgCount, lastCached.OutgoingMsgSize = m.streamPoolMetric.OutgoingMsg() + } m.mx.Lock() defer m.mx.Unlock() m.lastCachedState = lastCached diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 120093a3..8b0515ec 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -81,6 +81,16 @@ type streamPool struct { lastStreamId uint32 } +func (s *streamPool) OutgoingMsg() (count uint32, size uint64) { + s.mu.Lock() + defer s.mu.Unlock() + for _, st := range s.streams { + count += uint32(st.stats.msgCount.Load()) + size += uint64(st.stats.totalSize.Load()) + } + return +} + func (s *streamPool) Init(a *app.App) (err error) { s.metric, _ = a.Component(metric.CName).(metric.Metric) s.handler = a.MustComponent(streamhandler.CName).(streamhandler.StreamHandler) @@ -92,6 +102,7 @@ func (s *streamPool) Init(a *app.App) (err error) { s.streamConfig = a.MustComponent("config").(configGetter).GetStreamConfig() s.statService.AddProvider(s) if s.metric != nil { + s.metric.RegisterStreamPoolSyncMetric(s) registerMetrics(s.metric.Registry(), s, "") } return nil @@ -440,6 +451,7 @@ func (s *streamPool) removeStream(streamId uint32) { func (s *streamPool) Close(ctx context.Context) (err error) { s.statService.RemoveProvider(s) + s.metric.UnregisterStreamPoolSyncMetric() return s.dial.Close() }