mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
GO-2828: add peer stat
Signed-off-by: AnastasiaShemyakinskaya <shem98a@mail.ru>
This commit is contained in:
parent
7896963d6d
commit
cb2af70690
4 changed files with 197 additions and 18 deletions
|
@ -54,6 +54,16 @@ func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) {
|
|||
return pr, nil
|
||||
}
|
||||
|
||||
type Stat struct {
|
||||
PeerId string `json:"peerId"`
|
||||
SubConnections []*subConn `json:"subConnections"`
|
||||
Created time.Time `json:"created"`
|
||||
}
|
||||
|
||||
type StatProvider interface {
|
||||
ProvideStat() *Stat
|
||||
}
|
||||
|
||||
type Peer interface {
|
||||
Id() string
|
||||
Context() context.Context
|
||||
|
@ -320,3 +330,15 @@ func (p *peer) Close() (err error) {
|
|||
log.Debug("peer close", zap.String("peerId", p.id))
|
||||
return p.MultiConn.Close()
|
||||
}
|
||||
|
||||
func (p *peer) ProvideStat() *Stat {
|
||||
connections := make([]*subConn, 0, len(p.active))
|
||||
for c := range p.active {
|
||||
connections = append(connections, c)
|
||||
}
|
||||
return &Stat{
|
||||
PeerId: p.id,
|
||||
SubConnections: connections,
|
||||
Created: p.created,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/anyproto/any-sync/app/debugstat"
|
||||
"github.com/anyproto/any-sync/app/ocache"
|
||||
"github.com/anyproto/any-sync/net"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
|
@ -26,9 +27,14 @@ type Pool interface {
|
|||
Pick(ctx context.Context, id string) (pr peer.Peer, err error)
|
||||
}
|
||||
|
||||
type poolStats struct {
|
||||
PeerStats []*peer.Stat `json:"peerStats"`
|
||||
}
|
||||
|
||||
type pool struct {
|
||||
outgoing ocache.OCache
|
||||
incoming ocache.OCache
|
||||
outgoing ocache.OCache
|
||||
incoming ocache.OCache
|
||||
statService debugstat.StatService
|
||||
}
|
||||
|
||||
func (p *pool) Name() (name string) {
|
||||
|
@ -135,3 +141,32 @@ func (p *pool) pick(ctx context.Context, source ocache.OCache, id string) (peer.
|
|||
}
|
||||
return nil, fmt.Errorf("failed to pick connection with peer: peer not found")
|
||||
}
|
||||
|
||||
func (p *pool) AddStatProvider() {
|
||||
p.statService.AddProvider(p)
|
||||
}
|
||||
|
||||
func (p *pool) ProvideStat() any {
|
||||
peerStats := make([]*peer.Stat, 0)
|
||||
p.outgoing.ForEach(func(v ocache.Object) (isContinue bool) {
|
||||
if p, ok := v.(peer.StatProvider); ok {
|
||||
peerStats = append(peerStats, p.ProvideStat())
|
||||
}
|
||||
return true
|
||||
})
|
||||
p.incoming.ForEach(func(v ocache.Object) (isContinue bool) {
|
||||
if p, ok := v.(peer.StatProvider); ok {
|
||||
peerStats = append(peerStats, p.ProvideStat())
|
||||
}
|
||||
return true
|
||||
})
|
||||
return &poolStats{PeerStats: peerStats}
|
||||
}
|
||||
|
||||
func (p *pool) StatId() string {
|
||||
return CName
|
||||
}
|
||||
|
||||
func (p *pool) StatType() string {
|
||||
return CName
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ func TestPool_Get(t *testing.T) {
|
|||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return newTestPeer("1"), nil
|
||||
return newTestPeer("1", time.Now()), nil
|
||||
}
|
||||
p, err := fx.Get(ctx, "1")
|
||||
assert.NoError(t, err)
|
||||
|
@ -49,7 +49,7 @@ func TestPool_Get(t *testing.T) {
|
|||
t.Run("retry for closed", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
tp := newTestPeer("1")
|
||||
tp := newTestPeer("1", time.Now())
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return tp, nil
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ func TestPool_Get(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
assert.NotNil(t, p)
|
||||
p.Close()
|
||||
tp2 := newTestPeer("1")
|
||||
tp2 := newTestPeer("1", time.Now())
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return tp2, nil
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ func TestPool_GetOneOf(t *testing.T) {
|
|||
t.Run("from cache", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
tp1 := newTestPeer("1")
|
||||
tp1 := newTestPeer("1", time.Now())
|
||||
addToCache(t, fx, tp1)
|
||||
p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"})
|
||||
require.NoError(t, err)
|
||||
|
@ -89,10 +89,10 @@ func TestPool_GetOneOf(t *testing.T) {
|
|||
t.Run("from cache - skip closed", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
tp2 := newTestPeer("2")
|
||||
tp2 := newTestPeer("2", time.Now())
|
||||
addToCache(t, fx, tp2)
|
||||
tp2.Close()
|
||||
tp1 := newTestPeer("1")
|
||||
tp1 := newTestPeer("1", time.Now())
|
||||
addToCache(t, fx, tp1)
|
||||
p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"})
|
||||
require.NoError(t, err)
|
||||
|
@ -107,7 +107,7 @@ func TestPool_GetOneOf(t *testing.T) {
|
|||
return nil, fmt.Errorf("not expected call")
|
||||
}
|
||||
called = true
|
||||
return newTestPeer(peerId), nil
|
||||
return newTestPeer(peerId, time.Now()), nil
|
||||
}
|
||||
p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"})
|
||||
require.NoError(t, err)
|
||||
|
@ -139,12 +139,12 @@ func TestPool_AddPeer(t *testing.T) {
|
|||
t.Run("success", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
require.NoError(t, fx.AddPeer(ctx, newTestPeer("p1")))
|
||||
require.NoError(t, fx.AddPeer(ctx, newTestPeer("p1", time.Now())))
|
||||
})
|
||||
t.Run("two peers", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
p1, p2 := newTestPeer("p1"), newTestPeer("p1")
|
||||
p1, p2 := newTestPeer("p1", time.Now()), newTestPeer("p1", time.Now())
|
||||
require.NoError(t, fx.AddPeer(ctx, p1))
|
||||
require.NoError(t, fx.AddPeer(ctx, p2))
|
||||
select {
|
||||
|
@ -167,7 +167,7 @@ func TestPool_Pick(t *testing.T) {
|
|||
t.Run("success", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
p1 := newTestPeer("p1")
|
||||
p1 := newTestPeer("p1", time.Now())
|
||||
require.NoError(t, fx.AddPeer(ctx, p1))
|
||||
|
||||
p, err := fx.Pick(ctx, "p1")
|
||||
|
@ -179,7 +179,7 @@ func TestPool_Pick(t *testing.T) {
|
|||
t.Run("peer is closed", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
p1 := newTestPeer("p1")
|
||||
p1 := newTestPeer("p1", time.Now())
|
||||
require.NoError(t, fx.AddPeer(ctx, p1))
|
||||
require.NoError(t, p1.Close())
|
||||
p, err := fx.Pick(ctx, "p1")
|
||||
|
@ -189,6 +189,111 @@ func TestPool_Pick(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestProvideStat_NoPeers(t *testing.T) {
|
||||
t.Run("only incoming peers", func(t *testing.T) {
|
||||
// given
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
created := time.Now()
|
||||
p1 := newTestPeer("p1", created)
|
||||
require.NoError(t, fx.AddPeer(ctx, p1))
|
||||
|
||||
statProvider, ok := fx.Service.(*poolService)
|
||||
assert.True(t, ok)
|
||||
|
||||
// when
|
||||
stat := statProvider.ProvideStat()
|
||||
|
||||
// then
|
||||
assert.NotNil(t, stat)
|
||||
poolStat, ok := stat.(*poolStats)
|
||||
assert.True(t, ok)
|
||||
|
||||
assert.Len(t, poolStat.PeerStats, 1)
|
||||
assert.Equal(t, p1.id, poolStat.PeerStats[0].PeerId)
|
||||
assert.Equal(t, p1.created, poolStat.PeerStats[0].Created)
|
||||
})
|
||||
t.Run("outgoing and incoming peers", func(t *testing.T) {
|
||||
// given
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
created := time.Now()
|
||||
created1 := time.Now()
|
||||
p1 := newTestPeer("p1", created)
|
||||
require.NoError(t, fx.AddPeer(ctx, p1))
|
||||
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return newTestPeer(peerId, created1), nil
|
||||
}
|
||||
|
||||
peerId := "p2"
|
||||
_, err := fx.Get(ctx, peerId)
|
||||
require.NoError(t, err)
|
||||
|
||||
statProvider, ok := fx.Service.(*poolService)
|
||||
assert.True(t, ok)
|
||||
|
||||
// when
|
||||
stat := statProvider.ProvideStat()
|
||||
|
||||
// then
|
||||
assert.NotNil(t, stat)
|
||||
poolStat, ok := stat.(*poolStats)
|
||||
assert.True(t, ok)
|
||||
|
||||
assert.Len(t, poolStat.PeerStats, 2)
|
||||
assert.Equal(t, peerId, poolStat.PeerStats[0].PeerId)
|
||||
assert.Equal(t, created1, poolStat.PeerStats[0].Created)
|
||||
assert.Equal(t, p1.id, poolStat.PeerStats[1].PeerId)
|
||||
assert.Equal(t, p1.created, poolStat.PeerStats[1].Created)
|
||||
})
|
||||
t.Run("only outcoming peers", func(t *testing.T) {
|
||||
// given
|
||||
peerId := "p1"
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
created := time.Now()
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return newTestPeer(peerId, created), nil
|
||||
}
|
||||
|
||||
_, err := fx.Get(ctx, peerId)
|
||||
require.NoError(t, err)
|
||||
|
||||
statProvider, ok := fx.Service.(*poolService)
|
||||
assert.True(t, ok)
|
||||
|
||||
// when
|
||||
stat := statProvider.ProvideStat()
|
||||
|
||||
// then
|
||||
assert.NotNil(t, stat)
|
||||
poolStat, ok := stat.(*poolStats)
|
||||
assert.True(t, ok)
|
||||
|
||||
assert.Len(t, poolStat.PeerStats, 1)
|
||||
assert.Equal(t, peerId, poolStat.PeerStats[0].PeerId)
|
||||
assert.Equal(t, created, poolStat.PeerStats[0].Created)
|
||||
})
|
||||
t.Run("no peers", func(t *testing.T) {
|
||||
// given
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
|
||||
statProvider, ok := fx.Service.(*poolService)
|
||||
assert.True(t, ok)
|
||||
|
||||
// when
|
||||
stat := statProvider.ProvideStat()
|
||||
|
||||
// then
|
||||
assert.NotNil(t, stat)
|
||||
poolStat, ok := stat.(*poolStats)
|
||||
assert.True(t, ok)
|
||||
assert.Len(t, poolStat.PeerStats, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T) *fixture {
|
||||
fx := &fixture{
|
||||
Service: New(),
|
||||
|
@ -240,16 +345,25 @@ func (d *dialerMock) Name() (name string) {
|
|||
return "net.peerservice"
|
||||
}
|
||||
|
||||
func newTestPeer(id string) *testPeer {
|
||||
func newTestPeer(id string, created time.Time) *testPeer {
|
||||
return &testPeer{
|
||||
id: id,
|
||||
closed: make(chan struct{}),
|
||||
id: id,
|
||||
closed: make(chan struct{}),
|
||||
created: created,
|
||||
}
|
||||
}
|
||||
|
||||
type testPeer struct {
|
||||
id string
|
||||
closed chan struct{}
|
||||
id string
|
||||
closed chan struct{}
|
||||
created time.Time
|
||||
}
|
||||
|
||||
func (t *testPeer) ProvideStat() *peer.Stat {
|
||||
return &peer.Stat{
|
||||
PeerId: t.id,
|
||||
Created: t.created,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *testPeer) SetTTL(ttl time.Duration) {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/app/debugstat"
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
"github.com/anyproto/any-sync/app/ocache"
|
||||
"github.com/anyproto/any-sync/metric"
|
||||
|
@ -64,6 +65,12 @@ func (p *poolService) Init(a *app.App) (err error) {
|
|||
ocache.WithTTL(time.Minute),
|
||||
ocache.WithPrometheus(p.metricReg, "netpool", "incoming"),
|
||||
)
|
||||
comp, ok := a.Component(debugstat.CName).(debugstat.StatService)
|
||||
if !ok {
|
||||
comp = debugstat.NewNoOp()
|
||||
}
|
||||
p.pool.statService = comp
|
||||
p.pool.AddStatProvider()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -72,6 +79,7 @@ func (p *pool) Run(ctx context.Context) (err error) {
|
|||
}
|
||||
|
||||
func (p *pool) Close(ctx context.Context) (err error) {
|
||||
p.statService.RemoveProvider(p)
|
||||
if e := p.incoming.Close(); e != nil {
|
||||
log.Warn("close incoming cache error", zap.Error(e))
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue