1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-08 13:57:12 +09:00
anytype-heart/metrics/client.go
2025-02-27 19:14:07 +01:00

174 lines
3.4 KiB
Go

package metrics
import (
"context"
"errors"
"sync"
"time"
"github.com/cheggaaa/mb/v3"
"github.com/anyproto/anytype-heart/metrics/anymetry"
)
var (
sendingTimeLimit = time.Minute
sendingQueueLimitMin = 10
sendingQueueLimitMax = 100
)
type client struct {
lock sync.RWMutex
batcherLock sync.RWMutex
telemetry anymetry.Service
aggregatableMap map[string]SamplableEvent
aggregatableChan chan SamplableEvent
ctx context.Context
cancel context.CancelFunc
batcher *mb.MB[anymetry.Event]
}
func (c *client) startAggregating() {
ctx := c.ctx
go func() {
ticker := time.NewTicker(sendInterval)
for {
select {
case <-ticker.C:
c.recordAggregatedData()
case ev := <-c.aggregatableChan:
c.lock.Lock()
other, ok := c.aggregatableMap[ev.Key()]
var newEv SamplableEvent
if !ok {
newEv = ev
} else {
newEv = ev.Aggregate(other)
}
c.aggregatableMap[ev.Key()] = newEv
c.lock.Unlock()
case <-ctx.Done():
c.recordAggregatedData()
// we close here so that we are sure that we don't lose the aggregated data
clientBatcher := c.getBatcher()
if clientBatcher != nil {
err := clientBatcher.Close()
if err != nil {
clientMetricsLog.Errorf("failed to close batcher")
}
}
return
}
}
}()
}
func (c *client) startSendingBatchMessages(info anymetry.AppInfoProvider) {
ctx := c.ctx
attempt := 0
for {
clientBatcher := c.getBatcher()
if clientBatcher == nil {
return
}
ctx = mb.CtxWithTimeLimit(ctx, sendingTimeLimit)
msgs, err := clientBatcher.NewCond().
WithMin(sendingQueueLimitMin).
WithMax(sendingQueueLimitMax).
Wait(ctx)
if errors.Is(err, mb.ErrClosed) {
return
}
err = c.sendNextBatch(info, clientBatcher, msgs)
timeout := time.Second * 2
if err == nil {
attempt = 0
} else {
timeout = time.Second * 5 * time.Duration(attempt+1)
if timeout > maxTimeout {
timeout = maxTimeout
}
attempt++
}
select {
case <-c.ctx.Done():
return
case <-time.After(timeout):
}
}
}
func (c *client) Close() {
if c.getBatcher() == nil {
return
}
c.cancel()
c.setBatcher(nil)
}
func (c *client) getBatcher() *mb.MB[anymetry.Event] {
defer c.batcherLock.RUnlock()
c.batcherLock.RLock()
return c.batcher
}
func (c *client) setBatcher(batcher *mb.MB[anymetry.Event]) {
defer c.batcherLock.Unlock()
c.batcherLock.Lock()
c.batcher = batcher
}
func (c *client) sendNextBatch(info anymetry.AppInfoProvider, batcher *mb.MB[anymetry.Event], msgs []anymetry.Event) (err error) {
clientBatcher := c.getBatcher()
if clientBatcher == nil || len(msgs) == 0 {
return nil
}
err = c.telemetry.SendEvents(msgs, info)
if err != nil {
if batcher != nil {
_ = batcher.TryAdd(msgs...) //nolint:errcheck
}
} else {
clientMetricsLog.
With("user-id", info.GetUserId()).
With("messages sent", len(msgs)).
Debug("events sent to telemetry")
}
return
}
func (c *client) recordAggregatedData() {
c.lock.Lock()
toSend := c.aggregatableMap
c.aggregatableMap = make(map[string]SamplableEvent)
c.lock.Unlock()
for _, ev := range toSend {
c.send(ev)
}
}
func (c *client) sendSampled(ev SamplableEvent) {
select {
case c.aggregatableChan <- ev:
default:
}
}
func (c *client) send(e anymetry.Event) {
if e == nil {
return
}
e.SetTimestamp()
clientBatcher := c.getBatcher()
if clientBatcher == nil {
return
}
_ = clientBatcher.TryAdd(e) //nolint:errcheck
}