From 2ba07d2b115f078709ef6343845e9a7edfc7dabf Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 15 Aug 2024 09:18:06 +0200 Subject: [PATCH] Update counting msg size --- commonspace/object/tree/synctree/headupdate.go | 10 +++++++--- commonspace/sync/syncmetric.go | 13 ++++++++++--- util/multiqueue/multiqueue.go | 3 ++- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/commonspace/object/tree/synctree/headupdate.go b/commonspace/object/tree/synctree/headupdate.go index 72700a7b..b6a526c9 100644 --- a/commonspace/object/tree/synctree/headupdate.go +++ b/commonspace/object/tree/synctree/headupdate.go @@ -15,9 +15,13 @@ type InnerHeadUpdate struct { root *treechangeproto.RawTreeChangeWithId } -func (h InnerHeadUpdate) MsgSize() uint64 { - size := uint64(len(h.heads)) - size += uint64(len(h.snapshotPath)) +func (h InnerHeadUpdate) MsgSize() (size uint64) { + for _, head := range h.heads { + size += uint64(len(head)) + } + for _, snapshotId := range h.snapshotPath { + size += uint64(len(snapshotId)) + } for _, change := range h.changes { size += uint64(len(change.Id)) size += uint64(len(change.RawChange)) diff --git a/commonspace/sync/syncmetric.go b/commonspace/sync/syncmetric.go index 665b4d0b..feed96a8 100644 --- a/commonspace/sync/syncmetric.go +++ b/commonspace/sync/syncmetric.go @@ -73,8 +73,15 @@ func (m *syncMetric) UpdateQueueSize(size uint64, msgType int, add bool) { atCount.Add(1) m.totalSize.Add(intSize) } else { - atSize.Add(-intSize) - atCount.Add(-1) - m.totalSize.Add(-intSize) + curCount := atCount.Load() + curSize := atSize.Load() + if curCount != 0 { + atCount.Add(-1) + } + // TODO: fix the root cause :-) + if curSize > intSize { + atSize.Add(-intSize) + m.totalSize.Add(-intSize) + } } } diff --git a/util/multiqueue/multiqueue.go b/util/multiqueue/multiqueue.go index 9554d488..e112618b 100644 --- a/util/multiqueue/multiqueue.go +++ b/util/multiqueue/multiqueue.go @@ -71,11 +71,12 @@ func (m *multiQueue[T]) Add(ctx context.Context, threadId string, msg T) (err er q = m.startThread(threadId) } m.mu.Unlock() + m.updateSize(msg, true) err = q.TryAdd(msg) if err != nil { + m.updateSize(msg, false) return } - m.updateSize(msg, true) return }