1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

WIP request queue debug stat

This commit is contained in:
mcrakhman 2023-11-20 19:39:18 +01:00
parent 3992935ac2
commit bdfab4bc2a
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
5 changed files with 248 additions and 10 deletions

15
app/debugstat/models.go Normal file
View file

@ -0,0 +1,15 @@
package debugstat
type statValue struct {
Key string `json:"key"`
Value any `json:"value"`
}
type statType struct {
Type string `json:"type"`
Values []statValue `json:"values"`
}
type statSummary struct {
Stats []statType `json:"stat_types"`
}

103
app/debugstat/stat.go Normal file
View file

@ -0,0 +1,103 @@
package debugstat
import (
"context"
"sync"
"time"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/util/periodicsync"
)
var log = logger.NewNamed(CName)
const CName = "common.debugstat"
const (
statCollectionSecs = 200
statTimeout = time.Second * 10
)
type StatProvider interface {
ProvideStat() any
StatId() string
StatType() string
}
type StatService interface {
app.ComponentRunnable
AddProvider(provider StatProvider)
RemoveProvider(provider StatProvider)
}
type statService struct {
providers map[string]StatProvider
updater periodicsync.PeriodicSync
curStat statSummary
sync.Mutex
}
func (s *statService) AddProvider(provider StatProvider) {
s.Lock()
defer s.Unlock()
s.providers[provider.StatId()] = provider
}
func (s *statService) RemoveProvider(provider StatProvider) {
s.Lock()
defer s.Unlock()
delete(s.providers, provider.StatId())
}
func (s *statService) Init(a *app.App) (err error) {
s.providers = map[string]StatProvider{}
s.updater = periodicsync.NewPeriodicSync(statCollectionSecs, statTimeout, s.loop, log)
return nil
}
func (s *statService) Name() (name string) {
return CName
}
func (s *statService) loop(ctx context.Context) (err error) {
s.Lock()
allProviders := map[string][]StatProvider{}
for _, prov := range s.providers {
tp := prov.StatType()
provs := allProviders[tp]
provs = append(provs, prov)
allProviders[tp] = provs
}
s.Unlock()
st := statSummary{}
for tp, provs := range allProviders {
stType := statType{
Type: tp,
Values: nil,
}
for _, prov := range provs {
stat := prov.ProvideStat()
stType.Values = append(stType.Values, statValue{
Key: prov.StatId(),
Value: stat,
})
}
st.Stats = append(st.Stats, stType)
}
s.Lock()
defer s.Unlock()
s.curStat = st
return nil
}
func (s *statService) Run(ctx context.Context) (err error) {
s.updater.Run()
return nil
}
func (s *statService) Close(ctx context.Context) (err error) {
s.updater.Close()
return nil
}

View file

@ -4,16 +4,19 @@ import (
"context" "context"
"sync" "sync"
"go.uber.org/zap"
"storj.io/drpc"
"github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/debugstat"
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/net/rpc/rpcerr"
"github.com/anyproto/any-sync/net/streampool" "github.com/anyproto/any-sync/net/streampool"
"go.uber.org/zap"
"storj.io/drpc"
) )
const CName = "common.commonspace.requestmanager" const CName = "common.commonspace.requestmanager"
@ -48,10 +51,29 @@ type requestManager struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
clientFactory spacesyncproto.ClientFactory clientFactory spacesyncproto.ClientFactory
statService debugstat.StatService
reqStat *requestStat
spaceId string
}
func (r *requestManager) ProvideStat() any {
return r.reqStat.QueueStat()
}
func (r *requestManager) StatId() string {
return r.spaceId
}
func (r *requestManager) StatType() string {
return CName
} }
func (r *requestManager) Init(a *app.App) (err error) { func (r *requestManager) Init(a *app.App) (err error) {
r.ctx, r.cancel = context.WithCancel(context.Background()) r.ctx, r.cancel = context.WithCancel(context.Background())
spaceState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
r.statService = a.MustComponent(debugstat.CName).(debugstat.StatService)
r.reqStat = newRequestStat()
r.spaceId = spaceState.SpaceId
r.handler = a.MustComponent(objectsync.CName).(MessageHandler) r.handler = a.MustComponent(objectsync.CName).(MessageHandler)
r.peerPool = a.MustComponent(pool.CName).(pool.Pool) r.peerPool = a.MustComponent(pool.CName).(pool.Pool)
r.clientFactory = spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) r.clientFactory = spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
@ -63,6 +85,7 @@ func (r *requestManager) Name() (name string) {
} }
func (r *requestManager) Run(ctx context.Context) (err error) { func (r *requestManager) Run(ctx context.Context) (err error) {
r.statService.AddProvider(r)
return nil return nil
} }
@ -70,6 +93,7 @@ func (r *requestManager) Close(ctx context.Context) (err error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.cancel() r.cancel()
r.statService.RemoveProvider(r)
for _, p := range r.pools { for _, p := range r.pools {
_ = p.Close() _ = p.Close()
} }
@ -78,6 +102,11 @@ func (r *requestManager) Close(ctx context.Context) (err error) {
func (r *requestManager) SendRequest(ctx context.Context, peerId string, req *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { func (r *requestManager) SendRequest(ctx context.Context, peerId string, req *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
// TODO: limit concurrent sends? // TODO: limit concurrent sends?
size := r.reqStat.CalcSize(req)
r.reqStat.AddSyncRequest(peerId, size)
defer func() {
r.reqStat.RemoveSyncRequest(peerId, size)
}()
return r.doRequest(ctx, peerId, req) return r.doRequest(ctx, peerId, req)
} }
@ -90,10 +119,13 @@ func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectS
r.pools[peerId] = pl r.pools[peerId] = pl
pl.Run() pl.Run()
} }
size := r.reqStat.CalcSize(req)
r.reqStat.AddQueueRequest(peerId, size)
// TODO: for later think when many clients are there, // TODO: for later think when many clients are there,
// we need to close pools for inactive clients // we need to close pools for inactive clients
return pl.TryAdd(func() { return pl.TryAdd(func() {
doRequestAndHandle(r, peerId, req) doRequestAndHandle(r, peerId, req)
r.reqStat.RemoveQueueRequest(peerId, size)
}) })
} }

View file

@ -5,6 +5,11 @@ import (
"sync" "sync"
"testing" "testing"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync" "github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
@ -12,10 +17,6 @@ import (
"github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/peer/mock_peer" "github.com/anyproto/any-sync/net/peer/mock_peer"
"github.com/anyproto/any-sync/net/pool/mock_pool" "github.com/anyproto/any-sync/net/pool/mock_pool"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
) )
type fixture struct { type fixture struct {
@ -58,7 +59,7 @@ func TestRequestManager_SyncRequest(t *testing.T) {
fx := newFixture(t) fx := newFixture(t)
defer fx.stop() defer fx.stop()
peerId := "peerId" peerId := "PeerId"
peerMock := mock_peer.NewMockPeer(fx.ctrl) peerMock := mock_peer.NewMockPeer(fx.ctrl)
conn := &drpcconn.Conn{} conn := &drpcconn.Conn{}
msg := &spacesyncproto.ObjectSyncMessage{} msg := &spacesyncproto.ObjectSyncMessage{}
@ -78,7 +79,7 @@ func TestRequestManager_SyncRequest(t *testing.T) {
defer fx.stop() defer fx.stop()
ctx = fx.requestManager.ctx ctx = fx.requestManager.ctx
peerId := "peerId" peerId := "PeerId"
peerMock := mock_peer.NewMockPeer(fx.ctrl) peerMock := mock_peer.NewMockPeer(fx.ctrl)
conn := &drpcconn.Conn{} conn := &drpcconn.Conn{}
msg := &spacesyncproto.ObjectSyncMessage{} msg := &spacesyncproto.ObjectSyncMessage{}
@ -119,7 +120,7 @@ func TestRequestManager_QueueRequest(t *testing.T) {
otherMsg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "otherId1"} otherMsg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "otherId1"}
// sending requests to first peer // sending requests to first peer
peerId := "peerId" peerId := "PeerId"
err := fx.requestManager.QueueRequest(peerId, msg1) err := fx.requestManager.QueueRequest(peerId, msg1)
require.NoError(t, err) require.NoError(t, err)
err = fx.requestManager.QueueRequest(peerId, msg2) err = fx.requestManager.QueueRequest(peerId, msg2)
@ -165,7 +166,7 @@ func TestRequestManager_QueueRequest(t *testing.T) {
msg2 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id2"} msg2 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id2"}
// sending requests to first peer // sending requests to first peer
peerId := "peerId" peerId := "PeerId"
err := fx.requestManager.QueueRequest(peerId, msg1) err := fx.requestManager.QueueRequest(peerId, msg1)
require.NoError(t, err) require.NoError(t, err)
err = fx.requestManager.QueueRequest(peerId, msg2) err = fx.requestManager.QueueRequest(peerId, msg2)

View file

@ -0,0 +1,87 @@
package requestmanager
import (
"sync"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
type requestStat struct {
sync.Mutex
peerStats map[string]peerStat
}
func newRequestStat() *requestStat {
return &requestStat{
peerStats: make(map[string]peerStat),
}
}
type spaceQueueStat struct {
TotalSize int `json:"total_size"`
PeerStats []peerStat `json:"peer_stats"`
}
type peerStat struct {
QueueCount int `json:"queue_count"`
QueueSize int `json:"queue_size"`
SyncSize int `json:"sync_size"`
SyncCount int `json:"sync_count"`
PeerId string `json:"peer_id"`
}
func (r *requestStat) AddQueueRequest(peerId string, size int) {
r.Lock()
defer r.Unlock()
stat := r.peerStats[peerId]
stat.QueueCount++
stat.QueueSize += size
r.peerStats[peerId] = stat
}
func (r *requestStat) AddSyncRequest(peerId string, size int) {
r.Lock()
defer r.Unlock()
stat := r.peerStats[peerId]
stat.SyncCount++
stat.SyncSize += size
r.peerStats[peerId] = stat
}
func (r *requestStat) RemoveSyncRequest(peerId string, size int) {
r.Lock()
defer r.Unlock()
stat := r.peerStats[peerId]
stat.SyncCount--
stat.SyncSize -= size
r.peerStats[peerId] = stat
}
func (r *requestStat) RemoveQueueRequest(peerId string, size int) {
r.Lock()
defer r.Unlock()
stat := r.peerStats[peerId]
stat.QueueCount--
stat.QueueSize -= size
r.peerStats[peerId] = stat
}
func (r *requestStat) CalcSize(msg *spacesyncproto.ObjectSyncMessage) int {
return len(msg.Payload)
}
func (r *requestStat) QueueStat() spaceQueueStat {
r.Lock()
defer r.Unlock()
var totalSize int
var peerStats []peerStat
for peerId, stat := range r.peerStats {
totalSize += stat.QueueSize
stat.PeerId = peerId
peerStats = append(peerStats, stat)
}
return spaceQueueStat{
TotalSize: totalSize,
PeerStats: peerStats,
}
}