From 415bf3313dc72220f59ad2033c7fff7bd6d80ba8 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 16 Aug 2024 12:26:26 +0200 Subject: [PATCH] Update load iterator batch size --- commonspace/object/tree/objecttree/loaditerator.go | 2 +- commonspace/object/tree/synctree/requestfactory.go | 2 +- commonspace/sync/syncmetric.go | 4 ++++ net/streampool/streamstat.go | 9 +++++++++ 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/commonspace/object/tree/objecttree/loaditerator.go b/commonspace/object/tree/objecttree/loaditerator.go index 53508210..94f57d30 100644 --- a/commonspace/object/tree/objecttree/loaditerator.go +++ b/commonspace/object/tree/objecttree/loaditerator.go @@ -60,7 +60,7 @@ func (l *loadIterator) NextBatch(maxSize int) (batch IteratorBatch, err error) { batch.Heads = append(batch.Heads, c.Id) return true } - if curSize+rawEntry.size > maxSize && len(batch.Batch) != 0 { + if curSize+rawEntry.size >= maxSize && len(batch.Batch) != 0 { l.isExhausted = false return false } diff --git a/commonspace/object/tree/synctree/requestfactory.go b/commonspace/object/tree/synctree/requestfactory.go index 57cb7c91..8939d44e 100644 --- a/commonspace/object/tree/synctree/requestfactory.go +++ b/commonspace/object/tree/synctree/requestfactory.go @@ -6,7 +6,7 @@ import ( "github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages" ) -const batchSize = 1024 * 1024 * 10 +const batchSize = 1024 * 1024 type RequestFactory interface { CreateHeadUpdate(t objecttree.ObjectTree, ignoredPeer string, added []*treechangeproto.RawTreeChangeWithId) (headUpdate *objectmessages.HeadUpdate) diff --git a/commonspace/sync/syncmetric.go b/commonspace/sync/syncmetric.go index feed96a8..3bf0996e 100644 --- a/commonspace/sync/syncmetric.go +++ b/commonspace/sync/syncmetric.go @@ -4,6 +4,8 @@ import ( "sync" "sync/atomic" + "go.uber.org/zap" + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" "github.com/anyproto/any-sync/metric" ) @@ -82,6 +84,8 @@ func (m *syncMetric) UpdateQueueSize(size uint64, msgType int, add bool) { if curSize > intSize { atSize.Add(-intSize) m.totalSize.Add(-intSize) + } else { + log.Error("syncMetric.UpdateQueueSize: totalSize is less than message size", zap.Int64("size", intSize), zap.Int64("totalSize", curSize)) } } } diff --git a/net/streampool/streamstat.go b/net/streampool/streamstat.go index 6d746fcf..499446d7 100644 --- a/net/streampool/streamstat.go +++ b/net/streampool/streamstat.go @@ -3,6 +3,7 @@ package streampool import ( "sync/atomic" + "go.uber.org/zap" "storj.io/drpc" ) @@ -38,6 +39,14 @@ func (st *streamStat) AddMessage(msg drpc.Message) { func (st *streamStat) RemoveMessage(msg drpc.Message) { if sizeable, ok := msg.(SizeableMessage); ok { + size := sizeable.Size() + // TODO: find the real problem :-) + if st.totalSize.Load() > int64(size) { + st.totalSize.Add(-int64(size)) + } else { + log.Error("streamStat.RemoveMessage: totalSize is less than message size", zap.Int("size", size), zap.Int64("totalSize", st.totalSize.Load())) + st.totalSize.Store(0) + } st.totalSize.Add(-int64(sizeable.Size())) st.msgCount.Add(-1) }