1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-07 21:47:02 +09:00

Update counting msg size

This commit is contained in:
mcrakhman 2024-08-15 09:18:06 +02:00
parent 7e9839336c
commit 2ba07d2b11
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
3 changed files with 19 additions and 7 deletions

View file

@ -15,9 +15,13 @@ type InnerHeadUpdate struct {
root *treechangeproto.RawTreeChangeWithId
}
func (h InnerHeadUpdate) MsgSize() uint64 {
size := uint64(len(h.heads))
size += uint64(len(h.snapshotPath))
func (h InnerHeadUpdate) MsgSize() (size uint64) {
for _, head := range h.heads {
size += uint64(len(head))
}
for _, snapshotId := range h.snapshotPath {
size += uint64(len(snapshotId))
}
for _, change := range h.changes {
size += uint64(len(change.Id))
size += uint64(len(change.RawChange))

View file

@ -73,8 +73,15 @@ func (m *syncMetric) UpdateQueueSize(size uint64, msgType int, add bool) {
atCount.Add(1)
m.totalSize.Add(intSize)
} else {
atSize.Add(-intSize)
atCount.Add(-1)
m.totalSize.Add(-intSize)
curCount := atCount.Load()
curSize := atSize.Load()
if curCount != 0 {
atCount.Add(-1)
}
// TODO: fix the root cause :-)
if curSize > intSize {
atSize.Add(-intSize)
m.totalSize.Add(-intSize)
}
}
}

View file

@ -71,11 +71,12 @@ func (m *multiQueue[T]) Add(ctx context.Context, threadId string, msg T) (err er
q = m.startThread(threadId)
}
m.mu.Unlock()
m.updateSize(msg, true)
err = q.TryAdd(msg)
if err != nil {
m.updateSize(msg, false)
return
}
m.updateSize(msg, true)
return
}