1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

WIP queue metrics

This commit is contained in:
mcrakhman 2024-06-24 21:29:16 +02:00
parent 88c12436a6
commit f37064e0d7
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
31 changed files with 307 additions and 57 deletions

View file

@ -11,6 +11,15 @@ type InnerHeadUpdate struct {
root *consensusproto.RawRecordWithId
}
func (h InnerHeadUpdate) MsgSize() uint64 {
size := uint64(len(h.head))
for _, record := range h.records {
size += uint64(len(record.Id))
size += uint64(len(record.Payload))
}
return size + uint64(len(h.head)) + uint64(len(h.root.Id)) + uint64(len(h.root.Payload))
}
func (h InnerHeadUpdate) Marshall(data objectmessages.ObjectMeta) ([]byte, error) {
treeMsg := consensusproto.WrapHeadUpdate(&consensusproto.LogHeadUpdate{
Head: h.head,

View file

@ -10,6 +10,13 @@ type InnerRequest struct {
root *consensusproto.RawRecordWithId
}
func (r *InnerRequest) MsgSize() uint64 {
size := uint64(len(r.head))
size += uint64(len(r.root.Id))
size += uint64(len(r.root.Payload))
return size
}
func NewRequest(peerId, objectId, spaceId, head string, root *consensusproto.RawRecordWithId) *objectmessages.Request {
return objectmessages.NewRequest(peerId, spaceId, objectId, &InnerRequest{
head: head,

View file

@ -17,6 +17,15 @@ type Response struct {
root *consensusproto.RawRecordWithId
}
func (r *Response) MsgSize() uint64 {
size := uint64(len(r.head))
for _, record := range r.records {
size += uint64(len(record.Id))
size += uint64(len(record.Payload))
}
return size + uint64(len(r.spaceId)) + uint64(len(r.objectId))
}
func (r *Response) ProtoMessage() (proto.Message, error) {
resp := &consensusproto.LogFullSyncResponse{
Head: r.head,

View file

@ -75,7 +75,7 @@ func (s *syncAclHandler) HandleHeadUpdate(ctx context.Context, statusUpdater syn
return s.syncClient.CreateFullSyncRequest(peerId, s.aclList), nil
}
func (s *syncAclHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, send func(resp proto.Message) error) (syncdeps.Request, error) {
func (s *syncAclHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, updater syncdeps.QueueSizeUpdater, send func(resp proto.Message) error) (syncdeps.Request, error) {
req, ok := rq.(*objectmessages.Request)
if !ok {
return nil, ErrUnexpectedRequestType
@ -100,6 +100,9 @@ func (s *syncAclHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Re
return nil, err
}
s.aclList.Unlock()
size := resp.MsgSize()
updater.UpdateQueueSize(size, syncdeps.MsgTypeSentResponse, true)
defer updater.UpdateQueueSize(size, syncdeps.MsgTypeSentResponse, false)
protoMsg, err := resp.ProtoMessage()
if err != nil {
return nil, err

View file

@ -15,6 +15,16 @@ type InnerHeadUpdate struct {
root *treechangeproto.RawTreeChangeWithId
}
func (h InnerHeadUpdate) MsgSize() uint64 {
size := uint64(len(h.heads))
size += uint64(len(h.snapshotPath))
for _, change := range h.changes {
size += uint64(len(change.Id))
size += uint64(len(change.RawChange))
}
return size
}
func (h InnerHeadUpdate) Marshall(data objectmessages.ObjectMeta) ([]byte, error) {
changes := h.changes
if slices.Contains(h.opts.EmptyPeers, data.PeerId) {

View file

@ -11,6 +11,14 @@ type InnerRequest struct {
root *treechangeproto.RawTreeChangeWithId
}
func (r *InnerRequest) MsgSize() uint64 {
size := uint64(len(r.heads)+len(r.snapshotPath)) * 59
if r.root != nil {
size += uint64(len(r.root.Id) + len(r.root.RawChange))
}
return size
}
func NewRequest(peerId, spaceId, objectId string, heads []string, snapshotPath []string, root *treechangeproto.RawTreeChangeWithId) *objectmessages.Request {
return objectmessages.NewRequest(peerId, spaceId, objectId, &InnerRequest{
heads: heads,

View file

@ -18,6 +18,16 @@ type Response struct {
root *treechangeproto.RawTreeChangeWithId
}
func (r *Response) MsgSize() uint64 {
size := uint64(len(r.spaceId)+len(r.objectId)) * 59
size += uint64(len(r.snapshotPath)) * 59
for _, change := range r.changes {
size += uint64(len(change.Id))
size += uint64(len(change.RawChange))
}
return size + uint64(len(r.heads))*59
}
func (r *Response) ProtoMessage() (proto.Message, error) {
resp := &treechangeproto.TreeFullSyncResponse{
Heads: r.heads,

View file

@ -77,7 +77,7 @@ func (s *syncHandler) HandleHeadUpdate(ctx context.Context, statusUpdater syncst
return nil, nil
}
func (s *syncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, send func(resp proto.Message) error) (syncdeps.Request, error) {
func (s *syncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, updater syncdeps.QueueSizeUpdater, send func(resp proto.Message) error) (syncdeps.Request, error) {
req, ok := rq.(*objectmessages.Request)
if !ok {
return nil, ErrUnexpectedRequestType
@ -116,11 +116,15 @@ func (s *syncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Reque
if len(batch.changes) == 0 {
break
}
size := batch.MsgSize()
updater.UpdateQueueSize(size, syncdeps.MsgTypeSentResponse, true)
protoBatch, err := batch.ProtoMessage()
if err != nil {
updater.UpdateQueueSize(size, syncdeps.MsgTypeSentResponse, false)
return nil, err
}
err = send(protoBatch)
updater.UpdateQueueSize(size, syncdeps.MsgTypeSentResponse, false)
if err != nil {
return nil, err
}

View file

@ -272,8 +272,10 @@ func (s *spaceProcess) update(ctx context.Context) error {
}
tr.Lock()
defer tr.Unlock()
bytes := make([]byte, 1024)
_, _ = rand.Read(bytes)
_, err = tr.AddContent(ctx, objecttree.SignableChangeContent{
Data: nil,
Data: bytes,
Key: s.accountService.Account().SignKey,
IsSnapshot: snapshot,
IsEncrypted: true,

View file

@ -0,0 +1,23 @@
package sync
import (
"sync"
"go.uber.org/zap"
)
type mockMetric struct {
sync.Mutex
totalSize uint64
}
func (m *mockMetric) UpdateQueueSize(size uint64, msgType int, add bool) {
m.Lock()
defer m.Unlock()
if add {
m.totalSize += size
} else {
m.totalSize -= size
}
log.Debug("total msg size", zap.Uint64("size", m.totalSize), zap.Uint64("added size", uint64(size)))
}

View file

@ -15,6 +15,7 @@ type BroadcastOptions struct {
type InnerHeadUpdate interface {
Marshall(data ObjectMeta) ([]byte, error)
Heads() []string
MsgSize() uint64
}
type ObjectMeta struct {
@ -29,6 +30,16 @@ type HeadUpdate struct {
Update InnerHeadUpdate
}
func (h *HeadUpdate) MsgSize() uint64 {
var byteSize uint64
if h.Update != nil {
byteSize += h.Update.MsgSize()
} else {
byteSize += uint64(len(h.Bytes))
}
return byteSize + uint64(len(h.Meta.PeerId)) + uint64(len(h.Meta.ObjectId)) + uint64(len(h.Meta.SpaceId))
}
func (h *HeadUpdate) SetPeerId(peerId string) {
h.Meta.PeerId = peerId
}

View file

@ -8,6 +8,7 @@ import (
type InnerRequest interface {
Marshall() ([]byte, error)
MsgSize() uint64
}
type Request struct {
@ -18,6 +19,16 @@ type Request struct {
Bytes []byte
}
func (r *Request) MsgSize() uint64 {
var byteSize uint64
if r.Inner != nil {
byteSize += r.Inner.MsgSize()
} else {
byteSize += uint64(len(r.Bytes))
}
return byteSize + uint64(len(r.peerId)) + uint64(len(r.objectId)) + uint64(len(r.spaceId))
}
func NewByteRequest(peerId, spaceId, objectId string, message []byte) *Request {
return &Request{
peerId: peerId,

View file

@ -20,6 +20,7 @@ import (
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/util/multiqueue"
)
var ErrUnexpectedHeadUpdateType = errors.New("unexpected head update type")
@ -71,7 +72,7 @@ func (o *objectSync) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Messa
return objHandler.HandleHeadUpdate(ctx, o.status, update)
}
func (o *objectSync) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, sendResponse func(resp proto.Message) error) (syncdeps.Request, error) {
func (o *objectSync) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, updater syncdeps.QueueSizeUpdater, sendResponse func(resp proto.Message) error) (syncdeps.Request, error) {
obj, err := o.manager.GetTree(context.Background(), o.spaceId, rq.ObjectId())
if err != nil {
return synctree.NewRequest(rq.PeerId(), o.spaceId, rq.ObjectId(), nil, nil, nil), treechangeproto.ErrGetTree
@ -80,7 +81,7 @@ func (o *objectSync) HandleStreamRequest(ctx context.Context, rq syncdeps.Reques
if !ok {
return nil, fmt.Errorf("object %s does not support sync", obj.Id())
}
return objHandler.HandleStreamRequest(ctx, rq, sendResponse)
return objHandler.HandleStreamRequest(ctx, rq, updater, sendResponse)
}
func (o *objectSync) ApplyRequest(ctx context.Context, rq syncdeps.Request, requestSender syncdeps.RequestSender) error {
@ -99,7 +100,7 @@ func (o *objectSync) ApplyRequest(ctx context.Context, rq syncdeps.Request, requ
return err
}
func (o *objectSync) TryAddMessage(ctx context.Context, peerId string, msg drpc.Message, q *mb.MB[drpc.Message]) error {
func (o *objectSync) TryAddMessage(ctx context.Context, peerId string, msg multiqueue.Sizeable, q *mb.MB[multiqueue.Sizeable]) error {
settable, ok := msg.(peerIdSettable)
if ok {
settable.SetPeerId(peerId)

View file

@ -30,13 +30,15 @@ type requestManager struct {
requestPool RequestPool
incomingGuard *guard
handler syncdeps.SyncHandler
metric syncdeps.QueueSizeUpdater
}
func NewRequestManager(handler syncdeps.SyncHandler) RequestManager {
func NewRequestManager(handler syncdeps.SyncHandler, metric syncdeps.QueueSizeUpdater) RequestManager {
return &requestManager{
requestPool: NewRequestPool(),
handler: handler,
incomingGuard: newGuard(),
metric: metric,
}
}
@ -52,7 +54,10 @@ func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, c
}
return err
}
size := resp.MsgSize()
r.metric.UpdateQueueSize(size, syncdeps.MsgTypeReceivedResponse, true)
err = collector.CollectResponse(ctx, rq.PeerId(), rq.ObjectId(), resp)
r.metric.UpdateQueueSize(size, syncdeps.MsgTypeReceivedResponse, false)
if err != nil {
return err
}
@ -62,20 +67,27 @@ func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, c
}
func (r *requestManager) QueueRequest(rq syncdeps.Request) error {
size := rq.MsgSize()
r.metric.UpdateQueueSize(size, syncdeps.MsgTypeOutgoingRequest, true)
return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) {
err := r.handler.ApplyRequest(ctx, rq, r)
if err != nil {
log.Error("failed to apply request", zap.Error(err))
}
}, func() {
r.metric.UpdateQueueSize(size, syncdeps.MsgTypeOutgoingRequest, false)
})
}
func (r *requestManager) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error {
size := rq.MsgSize()
r.metric.UpdateQueueSize(size, syncdeps.MsgTypeIncomingRequest, true)
defer r.metric.UpdateQueueSize(size, syncdeps.MsgTypeIncomingRequest, false)
if !r.incomingGuard.TryTake(fullId(rq.PeerId(), rq.ObjectId())) {
return nil
}
defer r.incomingGuard.Release(fullId(rq.PeerId(), rq.ObjectId()))
newRq, err := r.handler.HandleStreamRequest(ctx, rq, func(resp proto.Message) error {
newRq, err := r.handler.HandleStreamRequest(ctx, rq, r.metric, func(resp proto.Message) error {
return stream.MsgSend(resp, streampool.EncodingProto)
})
// here is a little bit non-standard decision, because we can return error but still can queue the request

View file

@ -8,7 +8,7 @@ import (
type RequestPool interface {
TryTake(peerId, objectId string) bool
Release(peerId, objectId string)
QueueRequestAction(peerId, objectId string, action func(ctx context.Context)) (err error)
QueueRequestAction(peerId, objectId string, action func(ctx context.Context), remove func()) (err error)
Close()
}
@ -45,7 +45,7 @@ func (rp *requestPool) Release(peerId, objectId string) {
rp.peerGuard.Release(fullId(peerId, objectId))
}
func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func(ctx context.Context)) (err error) {
func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func(ctx context.Context), remove func()) (err error) {
rp.mu.Lock()
if rp.isClosed {
rp.mu.Unlock()
@ -65,13 +65,12 @@ func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func(c
var wrappedAction func()
wrappedAction = func() {
if !rp.TryTake(peerId, objectId) {
pool.TryAdd(objectId, wrappedAction, func() {})
return
}
action(rp.ctx)
rp.Release(peerId, objectId)
}
pool.Replace(objectId, wrappedAction, func() {})
pool.Replace(objectId, wrappedAction, remove)
return nil
}

View file

@ -32,19 +32,24 @@ type SyncService interface {
}
type syncService struct {
sendQueueProvider multiqueue.QueueProvider[drpc.Message]
sendQueueProvider multiqueue.QueueProvider[multiqueue.Sizeable]
receiveQueue multiqueue.MultiQueue[msgCtx]
manager RequestManager
streamPool streampool.StreamPool
peerManager peermanager.PeerManager
handler syncdeps.SyncHandler
metric syncdeps.QueueSizeUpdater
ctx context.Context
cancel context.CancelFunc
}
type msgCtx struct {
ctx context.Context
drpc.Message
multiqueue.Sizeable
}
func (m msgCtx) MsgSize() uint64 {
return m.Sizeable.MsgSize()
}
func NewSyncService() SyncService {
@ -56,13 +61,14 @@ func (s *syncService) Name() (name string) {
}
func (s *syncService) Init(a *app.App) (err error) {
s.metric = &mockMetric{}
s.handler = a.MustComponent(syncdeps.CName).(syncdeps.SyncHandler)
s.sendQueueProvider = multiqueue.NewQueueProvider[drpc.Message](100, s.handleOutgoingMessage)
s.receiveQueue = multiqueue.New[msgCtx](s.handleIncomingMessage, 100)
s.sendQueueProvider = multiqueue.NewQueueProvider[multiqueue.Sizeable](100, syncdeps.MsgTypeOutgoing, s.metric, s.handleOutgoingMessage)
s.receiveQueue = multiqueue.New[msgCtx](s.handleIncomingMessage, s.metric, syncdeps.MsgTypeIncoming, 100)
s.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
s.streamPool = a.MustComponent(streampool.CName).(streampool.StreamPool)
s.streamPool.SetSyncDelegate(s)
s.manager = NewRequestManager(s.handler)
s.manager = NewRequestManager(s.handler, s.metric)
s.ctx, s.cancel = context.WithCancel(context.Background())
return nil
}
@ -85,12 +91,12 @@ func (s *syncService) BroadcastMessage(ctx context.Context, msg drpc.Message) er
return s.peerManager.BroadcastMessage(ctx, msg, s.streamPool)
}
func (s *syncService) handleOutgoingMessage(id string, msg drpc.Message, q *mb.MB[drpc.Message]) error {
func (s *syncService) handleOutgoingMessage(id string, msg multiqueue.Sizeable, q *mb.MB[multiqueue.Sizeable]) error {
return s.handler.TryAddMessage(s.ctx, id, msg, q)
}
func (s *syncService) handleIncomingMessage(msg msgCtx) {
req, err := s.handler.HandleHeadUpdate(msg.ctx, msg.Message)
req, err := s.handler.HandleHeadUpdate(msg.ctx, msg.Sizeable)
if err != nil {
log.Error("failed to handle head update", zap.Error(err))
}
@ -109,7 +115,7 @@ func (s *syncService) handleIncomingMessage(msg msgCtx) {
//)...)
}
func (s *syncService) GetQueue(peerId string) *multiqueue.Queue[drpc.Message] {
func (s *syncService) GetQueue(peerId string) *multiqueue.Queue[multiqueue.Sizeable] {
queue := s.sendQueueProvider.GetQueue(peerId)
return queue
}
@ -129,8 +135,8 @@ func (s *syncService) HandleMessage(ctx context.Context, peerId string, msg drpc
}
objectId := idMsg.ObjectId()
err := s.receiveQueue.Add(ctx, objectId, msgCtx{
ctx: ctx,
Message: msg,
ctx: ctx,
Sizeable: idMsg,
})
if errors.Is(err, mb.ErrOverflowed) {
log.Info("queue overflowed", zap.String("objectId", objectId))

View file

@ -2,4 +2,5 @@ package syncdeps
type Message interface {
ObjectId() string
MsgSize() uint64
}

View file

@ -0,0 +1,10 @@
package syncdeps
const (
MsgTypeIncoming = iota
MsgTypeOutgoing
MsgTypeIncomingRequest
MsgTypeOutgoingRequest
MsgTypeReceivedResponse
MsgTypeSentResponse
)

View file

@ -6,4 +6,5 @@ type Request interface {
PeerId() string
ObjectId() string
Proto() (proto.Message, error)
MsgSize() uint64
}

View file

@ -1,3 +1,9 @@
package syncdeps
type Response interface{}
import (
"github.com/anyproto/any-sync/util/multiqueue"
)
type Response interface {
multiqueue.Sizeable
}

View file

@ -9,6 +9,7 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/util/multiqueue"
)
const CName = "common.sync.syncdeps"
@ -24,17 +25,21 @@ type RequestSender interface {
type ObjectSyncHandler interface {
HandleHeadUpdate(ctx context.Context, statusUpdater syncstatus.StatusUpdater, headUpdate drpc.Message) (Request, error)
HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error)
HandleStreamRequest(ctx context.Context, rq Request, updater QueueSizeUpdater, send func(resp proto.Message) error) (Request, error)
HandleResponse(ctx context.Context, peerId, objectId string, resp Response) error
ResponseCollector() ResponseCollector
}
type QueueSizeUpdater interface {
UpdateQueueSize(size uint64, msgType int, add bool)
}
type SyncHandler interface {
app.Component
HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error)
HandleStreamRequest(ctx context.Context, rq Request, sendResponse func(resp proto.Message) error) (Request, error)
HandleStreamRequest(ctx context.Context, rq Request, updater QueueSizeUpdater, sendResponse func(resp proto.Message) error) (Request, error)
ApplyRequest(ctx context.Context, rq Request, requestSender RequestSender) error
TryAddMessage(ctx context.Context, peerId string, msg drpc.Message, q *mb.MB[drpc.Message]) error
TryAddMessage(ctx context.Context, peerId string, msg multiqueue.Sizeable, q *mb.MB[multiqueue.Sizeable]) error
SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error)
NewMessage() drpc.Message
}

View file

@ -11,6 +11,14 @@ type CounterRequest struct {
*synctestproto.CounterRequest
}
func (c CounterRequest) MsgSize() uint64 {
if c.CounterRequest != nil {
return uint64(proto.Size(c.CounterRequest))
} else {
return 0
}
}
func (c CounterRequest) Proto() (proto.Message, error) {
return c.CounterRequest, nil
}

View file

@ -9,6 +9,7 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/util/multiqueue"
)
type CounterSyncHandler struct {
@ -31,11 +32,11 @@ func (c *CounterSyncHandler) HandleHeadUpdate(ctx context.Context, headUpdate dr
return c.updateHandler.HandleHeadUpdate(ctx, headUpdate)
}
func (c *CounterSyncHandler) TryAddMessage(ctx context.Context, id string, msg drpc.Message, q *mb.MB[drpc.Message]) error {
func (c *CounterSyncHandler) TryAddMessage(ctx context.Context, id string, msg multiqueue.Sizeable, q *mb.MB[multiqueue.Sizeable]) error {
return q.TryAdd(msg)
}
func (c *CounterSyncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, send func(resp proto.Message) error) (syncdeps.Request, error) {
func (c *CounterSyncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, updater syncdeps.QueueSizeUpdater, send func(resp proto.Message) error) (syncdeps.Request, error) {
return c.requestHandler.HandleStreamRequest(ctx, rq, send)
}

View file

@ -6,22 +6,19 @@ import (
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/rpc/rpctest"
"github.com/anyproto/any-sync/net/rpc/server"
"github.com/anyproto/any-sync/net/streampool"
"github.com/anyproto/any-sync/util/multiqueue"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
)
const RpcName = "rpcserver"
type SyncService interface {
app.Component
GetQueue(peerId string) *multiqueue.Queue[drpc.Message]
HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error
HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error
QueueRequest(ctx context.Context, rq syncdeps.Request) error

View file

@ -0,0 +1,11 @@
package synctestproto
import "github.com/gogo/protobuf/proto"
func (c *CounterIncrease) MsgSize() uint64 {
if c != nil {
return uint64(proto.Size(c))
} else {
return 0
}
}

View file

@ -3,6 +3,7 @@ package sync
import (
"context"
"sync"
"sync/atomic"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
@ -11,6 +12,7 @@ import (
type entry struct {
call func()
onRemove func()
cnt uint64
}
func newTryAddQueue(workers, maxSize int) *tryAddQueue {
@ -29,52 +31,76 @@ type tryAddQueue struct {
ctx context.Context
cancel context.CancelFunc
workers int
cnt atomic.Uint64
entries map[string]entry
batch *mb.MB[string]
mx sync.Mutex
}
func (rp *tryAddQueue) Replace(id string, call, remove func()) {
curCnt := rp.cnt.Load()
rp.cnt.Add(1)
rp.mx.Lock()
if prevEntry, ok := rp.entries[id]; ok {
rp.entries[id] = entry{
call: call,
onRemove: remove,
cnt: curCnt,
}
rp.mx.Unlock()
prevEntry.onRemove()
return
}
rp.entries[id] = entry{
ent := entry{
call: call,
onRemove: remove,
cnt: curCnt,
}
rp.entries[id] = ent
rp.mx.Unlock()
err := rp.batch.TryAdd(id)
if err != nil {
rp.mx.Lock()
curEntry := rp.entries[id]
delete(rp.entries, id)
if curEntry.cnt == curCnt {
delete(rp.entries, id)
}
rp.mx.Unlock()
if curEntry.onRemove != nil {
curEntry.onRemove()
if ent.onRemove != nil {
ent.onRemove()
}
}
}
func (rp *tryAddQueue) TryAdd(id string, call, remove func()) bool {
curCnt := rp.cnt.Load()
rp.cnt.Add(1)
rp.mx.Lock()
if _, ok := rp.entries[id]; ok {
rp.mx.Unlock()
if remove != nil {
remove()
}
return false
}
rp.entries[id] = entry{
ent := entry{
call: call,
onRemove: remove,
cnt: curCnt,
}
rp.entries[id] = ent
rp.mx.Unlock()
if err := rp.batch.TryAdd(id); err != nil {
err := rp.batch.TryAdd(id)
if err != nil {
rp.mx.Lock()
curEntry := rp.entries[id]
if curEntry.cnt == curCnt {
delete(rp.entries, id)
}
rp.mx.Unlock()
if ent.onRemove != nil {
ent.onRemove()
}
return false
}
return true
@ -99,7 +125,9 @@ func (rp *tryAddQueue) sendLoop() {
rp.mx.Unlock()
if curEntry.call != nil {
curEntry.call()
curEntry.onRemove()
if curEntry.onRemove != nil {
curEntry.onRemove()
}
}
}
}

View file

@ -20,14 +20,14 @@ type stream struct {
streamId uint32
closed atomic.Bool
l logger.CtxLogger
queue *multiqueue.Queue[drpc.Message]
queue *multiqueue.Queue[multiqueue.Sizeable]
stats streamStat
syncDelegate StreamSyncDelegate
}
func (sr *stream) write(msg drpc.Message) (err error) {
sr.stats.AddMessage(msg)
err = sr.queue.TryAdd(msg)
err = sr.queue.TryAdd(msg.(multiqueue.Sizeable))
if err != nil {
sr.stats.RemoveMessage(msg)
}

View file

@ -22,13 +22,17 @@ type configGetter interface {
GetStreamConfig() StreamConfig
}
type sizeable interface {
MsgSize() uint64
}
type StreamSyncDelegate interface {
// HandleMessage handles incoming message
HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error)
// NewReadMessage creates new empty message for unmarshalling into it
NewReadMessage() drpc.Message
// GetQueue returns queue for outgoing messages
GetQueue(peerId string) *multiqueue.Queue[drpc.Message]
GetQueue(peerId string) *multiqueue.Queue[multiqueue.Sizeable]
// RemoveQueue removes queue for outgoing messages
RemoveQueue(peerId string) error
}

View file

@ -13,9 +13,11 @@ var (
ErrClosed = errors.New("multiQueue: closed")
)
func New[T any](h HandleFunc[T], maxThreadSize int) MultiQueue[T] {
func New[T Sizeable](h HandleFunc[T], updater sizeUpdater, msgType int, maxThreadSize int) MultiQueue[T] {
return &multiQueue[T]{
handler: h,
updater: updater,
msgType: msgType,
threads: make(map[string]*mb.MB[T]),
queueMaxSize: maxThreadSize,
}
@ -23,16 +25,27 @@ func New[T any](h HandleFunc[T], maxThreadSize int) MultiQueue[T] {
type HandleFunc[T any] func(msg T)
type MultiQueue[T any] interface {
type Sizeable interface {
MsgSize() uint64
}
type sizeUpdater interface {
UpdateQueueSize(size uint64, msgType int, add bool)
}
type MultiQueue[T Sizeable] interface {
Add(ctx context.Context, threadId string, msg T) (err error)
CloseThread(threadId string) (err error)
ThreadIds() []string
Close() (err error)
}
type multiQueue[T any] struct {
type multiQueue[T Sizeable] struct {
handler HandleFunc[T]
updater sizeUpdater
totalMsgSize uint64
queueMaxSize int
msgType int
threads map[string]*mb.MB[T]
mu sync.Mutex
closed bool
@ -59,7 +72,12 @@ func (m *multiQueue[T]) Add(ctx context.Context, threadId string, msg T) (err er
q = m.startThread(threadId)
}
m.mu.Unlock()
return q.TryAdd(msg)
err = q.TryAdd(msg)
if err != nil {
return
}
m.updateSize(msg, true)
return
}
func (m *multiQueue[T]) startThread(id string) *mb.MB[T] {
@ -75,10 +93,23 @@ func (m *multiQueue[T]) threadLoop(q *mb.MB[T]) {
if err != nil {
return
}
m.updateSize(msg, false)
m.handler(msg)
}
}
func (m *multiQueue[T]) updateSize(msg T, add bool) {
m.mu.Lock()
size := msg.MsgSize()
if add {
m.totalMsgSize += msg.MsgSize()
} else {
m.totalMsgSize -= msg.MsgSize()
}
m.mu.Unlock()
m.updater.UpdateQueueSize(size, m.msgType, add)
}
func (m *multiQueue[T]) CloseThread(threadId string) (err error) {
m.mu.Lock()
if m.closed {

View file

@ -6,30 +6,48 @@ import (
"github.com/cheggaaa/mb/v3"
)
type QueueHandler[T any] func(id string, msg T, q *mb.MB[T]) error
type QueueHandler[T Sizeable] func(id string, msg T, q *mb.MB[T]) error
type Queue[T any] struct {
type Queue[T Sizeable] struct {
id string
q *mb.MB[T]
msgType int
updater sizeUpdater
handler QueueHandler[T]
}
func NewQueue[T any](id string, size int, h QueueHandler[T]) *Queue[T] {
func NewQueue[T Sizeable](id string, size, msgType int, updater sizeUpdater, h QueueHandler[T]) *Queue[T] {
return &Queue[T]{
id: id,
updater: updater,
msgType: msgType,
q: mb.New[T](size),
handler: h,
}
}
func (q *Queue[T]) TryAdd(msg T) error {
return q.handler(q.id, msg, q.q)
err := q.handler(q.id, msg, q.q)
if err != nil {
return err
}
q.updateSize(msg, true)
return nil
}
func (q *Queue[T]) WaitOne(ctx context.Context) (T, error) {
return q.q.WaitOne(ctx)
res, err := q.q.WaitOne(ctx)
if err != nil {
return res, err
}
q.updateSize(res, false)
return res, nil
}
func (q *Queue[T]) Close() error {
return q.q.Close()
}
func (q *Queue[T]) updateSize(msg T, add bool) {
q.updater.UpdateQueueSize(msg.MsgSize(), q.msgType, add)
}

View file

@ -5,24 +5,28 @@ import (
"sync"
)
type QueueProvider[T any] interface {
type QueueProvider[T Sizeable] interface {
GetQueue(id string) *Queue[T]
RemoveQueue(id string) error
Close() error
}
type queueProvider[T any] struct {
type queueProvider[T Sizeable] struct {
queues map[string]*Queue[T]
mx sync.Mutex
closed bool
size int
msgType int
handler QueueHandler[T]
updater sizeUpdater
}
func NewQueueProvider[T any](size int, handler QueueHandler[T]) QueueProvider[T] {
func NewQueueProvider[T Sizeable](size, msgType int, updater sizeUpdater, handler QueueHandler[T]) QueueProvider[T] {
return &queueProvider[T]{
queues: make(map[string]*Queue[T]),
updater: updater,
size: size,
msgType: msgType,
handler: handler,
}
}
@ -35,7 +39,7 @@ func (p *queueProvider[T]) GetQueue(id string) *Queue[T] {
}
q, ok := p.queues[id]
if !ok {
q = NewQueue(id, p.size, p.handler)
q = NewQueue(id, p.size, p.msgType, p.updater, p.handler)
p.queues[id] = q
}
return q