mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
132 lines
3 KiB
Go
132 lines
3 KiB
Go
package sync
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"golang.org/x/exp/slices"
|
|
|
|
"github.com/anyproto/any-sync/app/debugstat"
|
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
|
)
|
|
|
|
type requestStat struct {
|
|
sync.Mutex
|
|
peerStats map[string]peerStat
|
|
spaceId string
|
|
}
|
|
|
|
func newRequestStat(spaceId string) *requestStat {
|
|
return &requestStat{
|
|
peerStats: make(map[string]peerStat),
|
|
spaceId: spaceId,
|
|
}
|
|
}
|
|
|
|
type spaceQueueStat struct {
|
|
SpaceId string `json:"space_id"`
|
|
TotalSize int64 `json:"total_size"`
|
|
PeerStats []peerStat `json:"peer_stats,omitempty"`
|
|
}
|
|
|
|
type summaryStat struct {
|
|
TotalSize int64 `json:"total_size"`
|
|
QueueStats []spaceQueueStat `json:"sorted_stats,omitempty"`
|
|
}
|
|
|
|
type peerStat struct {
|
|
QueueCount int `json:"queue_count"`
|
|
SyncCount int `json:"sync_count"`
|
|
QueueSize int64 `json:"queue_size"`
|
|
SyncSize int64 `json:"sync_size"`
|
|
PeerId string `json:"peer_id"`
|
|
}
|
|
|
|
func (r *requestStat) AddQueueRequest(peerId string, req *spacesyncproto.ObjectSyncMessage) {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
stat := r.peerStats[peerId]
|
|
stat.QueueCount++
|
|
stat.QueueSize += int64(req.Size())
|
|
r.peerStats[peerId] = stat
|
|
}
|
|
|
|
func (r *requestStat) AddSyncRequest(peerId string, req *spacesyncproto.ObjectSyncMessage) {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
stat := r.peerStats[peerId]
|
|
stat.SyncCount++
|
|
stat.SyncSize += int64(req.Size())
|
|
r.peerStats[peerId] = stat
|
|
}
|
|
|
|
func (r *requestStat) RemoveSyncRequest(peerId string, req *spacesyncproto.ObjectSyncMessage) {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
stat := r.peerStats[peerId]
|
|
stat.SyncCount--
|
|
stat.SyncSize -= int64(req.Size())
|
|
r.peerStats[peerId] = stat
|
|
}
|
|
|
|
func (r *requestStat) RemoveQueueRequest(peerId string, req *spacesyncproto.ObjectSyncMessage) {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
stat := r.peerStats[peerId]
|
|
stat.QueueCount--
|
|
stat.QueueSize -= int64(req.Size())
|
|
r.peerStats[peerId] = stat
|
|
}
|
|
|
|
func (r *requestStat) QueueStat() spaceQueueStat {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
var totalSize int64
|
|
var peerStats []peerStat
|
|
for peerId, stat := range r.peerStats {
|
|
totalSize += stat.QueueSize
|
|
stat.PeerId = peerId
|
|
peerStats = append(peerStats, stat)
|
|
}
|
|
slices.SortFunc(peerStats, func(first, second peerStat) int {
|
|
firstTotalSize := first.QueueSize + first.SyncSize
|
|
secondTotalSize := second.QueueSize + second.SyncSize
|
|
if firstTotalSize > secondTotalSize {
|
|
return -1
|
|
} else if firstTotalSize == secondTotalSize {
|
|
return 0
|
|
} else {
|
|
return 1
|
|
}
|
|
})
|
|
return spaceQueueStat{
|
|
SpaceId: r.spaceId,
|
|
TotalSize: totalSize,
|
|
PeerStats: peerStats,
|
|
}
|
|
}
|
|
|
|
func (r *requestStat) Aggregate(values []debugstat.StatValue) summaryStat {
|
|
var totalSize int64
|
|
var stats []spaceQueueStat
|
|
for _, v := range values {
|
|
stat, ok := v.Value.(spaceQueueStat)
|
|
if !ok {
|
|
continue
|
|
}
|
|
totalSize += stat.TotalSize
|
|
stats = append(stats, stat)
|
|
}
|
|
slices.SortFunc(stats, func(first, second spaceQueueStat) int {
|
|
if first.TotalSize > second.TotalSize {
|
|
return -1
|
|
} else if first.TotalSize == second.TotalSize {
|
|
return 0
|
|
} else {
|
|
return 1
|
|
}
|
|
})
|
|
return summaryStat{
|
|
TotalSize: totalSize,
|
|
QueueStats: stats,
|
|
}
|
|
}
|