From cb2af70690631235a65b52cefd49dc937d247263 Mon Sep 17 00:00:00 2001 From: AnastasiaShemyakinskaya Date: Wed, 9 Oct 2024 17:03:48 +0200 Subject: [PATCH] GO-2828: add peer stat Signed-off-by: AnastasiaShemyakinskaya --- net/peer/peer.go | 22 ++++++ net/pool/pool.go | 39 ++++++++++- net/pool/pool_test.go | 146 +++++++++++++++++++++++++++++++++++----- net/pool/poolservice.go | 8 +++ 4 files changed, 197 insertions(+), 18 deletions(-) diff --git a/net/peer/peer.go b/net/peer/peer.go index b29e8e1b..1316496a 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -54,6 +54,16 @@ func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) { return pr, nil } +type Stat struct { + PeerId string `json:"peerId"` + SubConnections []*subConn `json:"subConnections"` + Created time.Time `json:"created"` +} + +type StatProvider interface { + ProvideStat() *Stat +} + type Peer interface { Id() string Context() context.Context @@ -320,3 +330,15 @@ func (p *peer) Close() (err error) { log.Debug("peer close", zap.String("peerId", p.id)) return p.MultiConn.Close() } + +func (p *peer) ProvideStat() *Stat { + connections := make([]*subConn, 0, len(p.active)) + for c := range p.active { + connections = append(connections, c) + } + return &Stat{ + PeerId: p.id, + SubConnections: connections, + Created: p.created, + } +} diff --git a/net/pool/pool.go b/net/pool/pool.go index 9f2a529d..af55ed12 100644 --- a/net/pool/pool.go +++ b/net/pool/pool.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" + "github.com/anyproto/any-sync/app/debugstat" "github.com/anyproto/any-sync/app/ocache" "github.com/anyproto/any-sync/net" "github.com/anyproto/any-sync/net/peer" @@ -26,9 +27,14 @@ type Pool interface { Pick(ctx context.Context, id string) (pr peer.Peer, err error) } +type poolStats struct { + PeerStats []*peer.Stat `json:"peerStats"` +} + type pool struct { - outgoing ocache.OCache - incoming ocache.OCache + outgoing ocache.OCache + incoming ocache.OCache + statService debugstat.StatService } func (p *pool) Name() (name string) { @@ -135,3 +141,32 @@ func (p *pool) pick(ctx context.Context, source ocache.OCache, id string) (peer. } return nil, fmt.Errorf("failed to pick connection with peer: peer not found") } + +func (p *pool) AddStatProvider() { + p.statService.AddProvider(p) +} + +func (p *pool) ProvideStat() any { + peerStats := make([]*peer.Stat, 0) + p.outgoing.ForEach(func(v ocache.Object) (isContinue bool) { + if p, ok := v.(peer.StatProvider); ok { + peerStats = append(peerStats, p.ProvideStat()) + } + return true + }) + p.incoming.ForEach(func(v ocache.Object) (isContinue bool) { + if p, ok := v.(peer.StatProvider); ok { + peerStats = append(peerStats, p.ProvideStat()) + } + return true + }) + return &poolStats{PeerStats: peerStats} +} + +func (p *pool) StatId() string { + return CName +} + +func (p *pool) StatType() string { + return CName +} diff --git a/net/pool/pool_test.go b/net/pool/pool_test.go index 6c3f2c28..d9e64b24 100644 --- a/net/pool/pool_test.go +++ b/net/pool/pool_test.go @@ -36,7 +36,7 @@ func TestPool_Get(t *testing.T) { fx := newFixture(t) defer fx.Finish() fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) { - return newTestPeer("1"), nil + return newTestPeer("1", time.Now()), nil } p, err := fx.Get(ctx, "1") assert.NoError(t, err) @@ -49,7 +49,7 @@ func TestPool_Get(t *testing.T) { t.Run("retry for closed", func(t *testing.T) { fx := newFixture(t) defer fx.Finish() - tp := newTestPeer("1") + tp := newTestPeer("1", time.Now()) fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) { return tp, nil } @@ -57,7 +57,7 @@ func TestPool_Get(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) p.Close() - tp2 := newTestPeer("1") + tp2 := newTestPeer("1", time.Now()) fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) { return tp2, nil } @@ -80,7 +80,7 @@ func TestPool_GetOneOf(t *testing.T) { t.Run("from cache", func(t *testing.T) { fx := newFixture(t) defer fx.Finish() - tp1 := newTestPeer("1") + tp1 := newTestPeer("1", time.Now()) addToCache(t, fx, tp1) p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"}) require.NoError(t, err) @@ -89,10 +89,10 @@ func TestPool_GetOneOf(t *testing.T) { t.Run("from cache - skip closed", func(t *testing.T) { fx := newFixture(t) defer fx.Finish() - tp2 := newTestPeer("2") + tp2 := newTestPeer("2", time.Now()) addToCache(t, fx, tp2) tp2.Close() - tp1 := newTestPeer("1") + tp1 := newTestPeer("1", time.Now()) addToCache(t, fx, tp1) p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"}) require.NoError(t, err) @@ -107,7 +107,7 @@ func TestPool_GetOneOf(t *testing.T) { return nil, fmt.Errorf("not expected call") } called = true - return newTestPeer(peerId), nil + return newTestPeer(peerId, time.Now()), nil } p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"}) require.NoError(t, err) @@ -139,12 +139,12 @@ func TestPool_AddPeer(t *testing.T) { t.Run("success", func(t *testing.T) { fx := newFixture(t) defer fx.Finish() - require.NoError(t, fx.AddPeer(ctx, newTestPeer("p1"))) + require.NoError(t, fx.AddPeer(ctx, newTestPeer("p1", time.Now()))) }) t.Run("two peers", func(t *testing.T) { fx := newFixture(t) defer fx.Finish() - p1, p2 := newTestPeer("p1"), newTestPeer("p1") + p1, p2 := newTestPeer("p1", time.Now()), newTestPeer("p1", time.Now()) require.NoError(t, fx.AddPeer(ctx, p1)) require.NoError(t, fx.AddPeer(ctx, p2)) select { @@ -167,7 +167,7 @@ func TestPool_Pick(t *testing.T) { t.Run("success", func(t *testing.T) { fx := newFixture(t) defer fx.Finish() - p1 := newTestPeer("p1") + p1 := newTestPeer("p1", time.Now()) require.NoError(t, fx.AddPeer(ctx, p1)) p, err := fx.Pick(ctx, "p1") @@ -179,7 +179,7 @@ func TestPool_Pick(t *testing.T) { t.Run("peer is closed", func(t *testing.T) { fx := newFixture(t) defer fx.Finish() - p1 := newTestPeer("p1") + p1 := newTestPeer("p1", time.Now()) require.NoError(t, fx.AddPeer(ctx, p1)) require.NoError(t, p1.Close()) p, err := fx.Pick(ctx, "p1") @@ -189,6 +189,111 @@ func TestPool_Pick(t *testing.T) { }) } +func TestProvideStat_NoPeers(t *testing.T) { + t.Run("only incoming peers", func(t *testing.T) { + // given + fx := newFixture(t) + defer fx.Finish() + created := time.Now() + p1 := newTestPeer("p1", created) + require.NoError(t, fx.AddPeer(ctx, p1)) + + statProvider, ok := fx.Service.(*poolService) + assert.True(t, ok) + + // when + stat := statProvider.ProvideStat() + + // then + assert.NotNil(t, stat) + poolStat, ok := stat.(*poolStats) + assert.True(t, ok) + + assert.Len(t, poolStat.PeerStats, 1) + assert.Equal(t, p1.id, poolStat.PeerStats[0].PeerId) + assert.Equal(t, p1.created, poolStat.PeerStats[0].Created) + }) + t.Run("outgoing and incoming peers", func(t *testing.T) { + // given + fx := newFixture(t) + defer fx.Finish() + created := time.Now() + created1 := time.Now() + p1 := newTestPeer("p1", created) + require.NoError(t, fx.AddPeer(ctx, p1)) + + fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) { + return newTestPeer(peerId, created1), nil + } + + peerId := "p2" + _, err := fx.Get(ctx, peerId) + require.NoError(t, err) + + statProvider, ok := fx.Service.(*poolService) + assert.True(t, ok) + + // when + stat := statProvider.ProvideStat() + + // then + assert.NotNil(t, stat) + poolStat, ok := stat.(*poolStats) + assert.True(t, ok) + + assert.Len(t, poolStat.PeerStats, 2) + assert.Equal(t, peerId, poolStat.PeerStats[0].PeerId) + assert.Equal(t, created1, poolStat.PeerStats[0].Created) + assert.Equal(t, p1.id, poolStat.PeerStats[1].PeerId) + assert.Equal(t, p1.created, poolStat.PeerStats[1].Created) + }) + t.Run("only outcoming peers", func(t *testing.T) { + // given + peerId := "p1" + fx := newFixture(t) + defer fx.Finish() + created := time.Now() + fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) { + return newTestPeer(peerId, created), nil + } + + _, err := fx.Get(ctx, peerId) + require.NoError(t, err) + + statProvider, ok := fx.Service.(*poolService) + assert.True(t, ok) + + // when + stat := statProvider.ProvideStat() + + // then + assert.NotNil(t, stat) + poolStat, ok := stat.(*poolStats) + assert.True(t, ok) + + assert.Len(t, poolStat.PeerStats, 1) + assert.Equal(t, peerId, poolStat.PeerStats[0].PeerId) + assert.Equal(t, created, poolStat.PeerStats[0].Created) + }) + t.Run("no peers", func(t *testing.T) { + // given + fx := newFixture(t) + defer fx.Finish() + + statProvider, ok := fx.Service.(*poolService) + assert.True(t, ok) + + // when + stat := statProvider.ProvideStat() + + // then + assert.NotNil(t, stat) + poolStat, ok := stat.(*poolStats) + assert.True(t, ok) + assert.Len(t, poolStat.PeerStats, 0) + }) +} + func newFixture(t *testing.T) *fixture { fx := &fixture{ Service: New(), @@ -240,16 +345,25 @@ func (d *dialerMock) Name() (name string) { return "net.peerservice" } -func newTestPeer(id string) *testPeer { +func newTestPeer(id string, created time.Time) *testPeer { return &testPeer{ - id: id, - closed: make(chan struct{}), + id: id, + closed: make(chan struct{}), + created: created, } } type testPeer struct { - id string - closed chan struct{} + id string + closed chan struct{} + created time.Time +} + +func (t *testPeer) ProvideStat() *peer.Stat { + return &peer.Stat{ + PeerId: t.id, + Created: t.created, + } } func (t *testPeer) SetTTL(ttl time.Duration) { diff --git a/net/pool/poolservice.go b/net/pool/poolservice.go index efa07977..b858e295 100644 --- a/net/pool/poolservice.go +++ b/net/pool/poolservice.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/debugstat" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/ocache" "github.com/anyproto/any-sync/metric" @@ -64,6 +65,12 @@ func (p *poolService) Init(a *app.App) (err error) { ocache.WithTTL(time.Minute), ocache.WithPrometheus(p.metricReg, "netpool", "incoming"), ) + comp, ok := a.Component(debugstat.CName).(debugstat.StatService) + if !ok { + comp = debugstat.NewNoOp() + } + p.pool.statService = comp + p.pool.AddStatProvider() return nil } @@ -72,6 +79,7 @@ func (p *pool) Run(ctx context.Context) (err error) { } func (p *pool) Close(ctx context.Context) (err error) { + p.statService.RemoveProvider(p) if e := p.incoming.Close(); e != nil { log.Warn("close incoming cache error", zap.Error(e)) }