mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 14:07:02 +09:00
Update load iterator batch size
This commit is contained in:
parent
8caf60f4fb
commit
415bf3313d
4 changed files with 15 additions and 2 deletions
|
@ -60,7 +60,7 @@ func (l *loadIterator) NextBatch(maxSize int) (batch IteratorBatch, err error) {
|
||||||
batch.Heads = append(batch.Heads, c.Id)
|
batch.Heads = append(batch.Heads, c.Id)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if curSize+rawEntry.size > maxSize && len(batch.Batch) != 0 {
|
if curSize+rawEntry.size >= maxSize && len(batch.Batch) != 0 {
|
||||||
l.isExhausted = false
|
l.isExhausted = false
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
|
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
|
||||||
)
|
)
|
||||||
|
|
||||||
const batchSize = 1024 * 1024 * 10
|
const batchSize = 1024 * 1024
|
||||||
|
|
||||||
type RequestFactory interface {
|
type RequestFactory interface {
|
||||||
CreateHeadUpdate(t objecttree.ObjectTree, ignoredPeer string, added []*treechangeproto.RawTreeChangeWithId) (headUpdate *objectmessages.HeadUpdate)
|
CreateHeadUpdate(t objecttree.ObjectTree, ignoredPeer string, added []*treechangeproto.RawTreeChangeWithId) (headUpdate *objectmessages.HeadUpdate)
|
||||||
|
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
||||||
"github.com/anyproto/any-sync/metric"
|
"github.com/anyproto/any-sync/metric"
|
||||||
)
|
)
|
||||||
|
@ -82,6 +84,8 @@ func (m *syncMetric) UpdateQueueSize(size uint64, msgType int, add bool) {
|
||||||
if curSize > intSize {
|
if curSize > intSize {
|
||||||
atSize.Add(-intSize)
|
atSize.Add(-intSize)
|
||||||
m.totalSize.Add(-intSize)
|
m.totalSize.Add(-intSize)
|
||||||
|
} else {
|
||||||
|
log.Error("syncMetric.UpdateQueueSize: totalSize is less than message size", zap.Int64("size", intSize), zap.Int64("totalSize", curSize))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package streampool
|
||||||
import (
|
import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
"storj.io/drpc"
|
"storj.io/drpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -38,6 +39,14 @@ func (st *streamStat) AddMessage(msg drpc.Message) {
|
||||||
|
|
||||||
func (st *streamStat) RemoveMessage(msg drpc.Message) {
|
func (st *streamStat) RemoveMessage(msg drpc.Message) {
|
||||||
if sizeable, ok := msg.(SizeableMessage); ok {
|
if sizeable, ok := msg.(SizeableMessage); ok {
|
||||||
|
size := sizeable.Size()
|
||||||
|
// TODO: find the real problem :-)
|
||||||
|
if st.totalSize.Load() > int64(size) {
|
||||||
|
st.totalSize.Add(-int64(size))
|
||||||
|
} else {
|
||||||
|
log.Error("streamStat.RemoveMessage: totalSize is less than message size", zap.Int("size", size), zap.Int64("totalSize", st.totalSize.Load()))
|
||||||
|
st.totalSize.Store(0)
|
||||||
|
}
|
||||||
st.totalSize.Add(-int64(sizeable.Size()))
|
st.totalSize.Add(-int64(sizeable.Size()))
|
||||||
st.msgCount.Add(-1)
|
st.msgCount.Add(-1)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue