1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 17:45:03 +09:00

Update limits

This commit is contained in:
mcrakhman 2024-07-03 21:47:38 +02:00
parent c229ba1da1
commit e7598966ca
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
4 changed files with 15 additions and 7 deletions

View file

@ -275,7 +275,7 @@ func (r *rawChangeLoader) loadAppendEntry(id string) (entry rawCacheEntry, err e
} }
size := len(rawChange.RawChange) size := len(rawChange.RawChange)
r.buf = rawChange.RawChange r.buf = rawChange.RawChange
change, err := r.changeBuilder.Unmarshall(rawChange, false) change, err := r.changeBuilder.Unmarshall(rawChange, false)
if err != nil { if err != nil {
return return

View file

@ -3,22 +3,29 @@ package sync
import "sync" import "sync"
type guard struct { type guard struct {
mu sync.Mutex mu sync.Mutex
taken map[string]struct{} taken map[string]struct{}
limit int
takenCount int
} }
func newGuard() *guard { func newGuard(limit int) *guard {
return &guard{ return &guard{
taken: make(map[string]struct{}), taken: make(map[string]struct{}),
limit: limit,
} }
} }
func (g *guard) TryTake(id string) bool { func (g *guard) TryTake(id string) bool {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
if g.limit != 0 && g.takenCount >= g.limit {
return false
}
if _, exists := g.taken[id]; exists { if _, exists := g.taken[id]; exists {
return false return false
} }
g.takenCount++
g.taken[id] = struct{}{} g.taken[id] = struct{}{}
return true return true
} }
@ -26,5 +33,6 @@ func (g *guard) TryTake(id string) bool {
func (g *guard) Release(id string) { func (g *guard) Release(id string) {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
g.takenCount--
delete(g.taken, id) delete(g.taken, id)
} }

View file

@ -40,7 +40,7 @@ func NewRequestManager(handler syncdeps.SyncHandler, metric syncdeps.QueueSizeUp
return &requestManager{ return &requestManager{
requestPool: NewRequestPool(), requestPool: NewRequestPool(),
handler: handler, handler: handler,
incomingGuard: newGuard(), incomingGuard: newGuard(1000),
metric: metric, metric: metric,
} }
} }

View file

@ -27,7 +27,7 @@ func NewRequestPool() RequestPool {
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
pools: make(map[string]*tryAddQueue), pools: make(map[string]*tryAddQueue),
peerGuard: newGuard(), peerGuard: newGuard(0),
} }
} }
@ -57,7 +57,7 @@ func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func(c
) )
pool, exists = rp.pools[peerId] pool, exists = rp.pools[peerId]
if !exists { if !exists {
pool = newTryAddQueue(100, 100) pool = newTryAddQueue(10, 100)
rp.pools[peerId] = pool rp.pools[peerId] = pool
pool.Run() pool.Run()
} }