mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
52 lines
1.1 KiB
Go
52 lines
1.1 KiB
Go
package streampool
|
|
|
|
import (
|
|
"sync/atomic"
|
|
|
|
"go.uber.org/zap"
|
|
"storj.io/drpc"
|
|
)
|
|
|
|
type SizeableMessage interface {
|
|
Size() int
|
|
}
|
|
|
|
type streamPoolStat struct {
|
|
TotalSize int64 `json:"total_size"`
|
|
Streams []streamStat `json:"streams,omitempty"`
|
|
}
|
|
|
|
type streamStat struct {
|
|
PeerId string `json:"peer_id"`
|
|
MsgCount int `json:"msg_count"`
|
|
TotalSize int64 `json:"total_size"`
|
|
msgCount atomic.Int32
|
|
totalSize atomic.Int64
|
|
}
|
|
|
|
func newStreamStat(peerId string) streamStat {
|
|
return streamStat{
|
|
PeerId: peerId,
|
|
}
|
|
}
|
|
|
|
func (st *streamStat) AddMessage(msg drpc.Message) {
|
|
if sizeable, ok := msg.(SizeableMessage); ok {
|
|
st.totalSize.Add(int64(sizeable.Size()))
|
|
}
|
|
st.msgCount.Add(1)
|
|
}
|
|
|
|
func (st *streamStat) RemoveMessage(msg drpc.Message) {
|
|
if sizeable, ok := msg.(SizeableMessage); ok {
|
|
size := sizeable.Size()
|
|
// TODO: find the real problem :-)
|
|
if st.totalSize.Load() >= int64(size) {
|
|
st.totalSize.Add(-int64(size))
|
|
} else {
|
|
log.Error("streamStat.RemoveMessage: totalSize is less than message size", zap.Int("size", size), zap.Int64("totalSize", st.totalSize.Load()))
|
|
st.totalSize.Store(0)
|
|
}
|
|
}
|
|
st.msgCount.Add(-1)
|
|
}
|