1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-10 18:10:54 +09:00

Add another limit

This commit is contained in:
mcrakhman 2024-07-04 22:41:28 +02:00
parent 94c4a38fbc
commit e1ebc37b04
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
2 changed files with 48 additions and 3 deletions

41
commonspace/sync/limit.go Normal file
View file

@ -0,0 +1,41 @@
package sync
import (
"sync"
)
type Limit struct {
max int
tokens map[string]int
cond *sync.Cond
mutex sync.Mutex
}
func NewLimit(max int) *Limit {
return &Limit{
max: max,
tokens: make(map[string]int),
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (l *Limit) Take(id string) {
l.mutex.Lock()
defer l.mutex.Unlock()
for l.tokens[id] >= l.max {
l.cond.Wait()
}
l.tokens[id]++
}
func (l *Limit) Release(id string) {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.tokens[id] > 0 {
l.tokens[id]--
l.cond.Signal()
}
}

View file

@ -32,6 +32,7 @@ type StreamResponse struct {
type requestManager struct {
requestPool RequestPool
incomingGuard *guard
limit *Limit
handler syncdeps.SyncHandler
metric syncdeps.QueueSizeUpdater
}
@ -39,8 +40,9 @@ type requestManager struct {
func NewRequestManager(handler syncdeps.SyncHandler, metric syncdeps.QueueSizeUpdater) RequestManager {
return &requestManager{
requestPool: NewRequestPool(),
limit: NewLimit(10),
handler: handler,
incomingGuard: newGuard(1000),
incomingGuard: newGuard(0),
metric: metric,
}
}
@ -96,12 +98,14 @@ func (r *requestManager) HandleDeprecatedObjectSync(ctx context.Context, req *sp
func (r *requestManager) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error {
size := rq.MsgSize()
r.metric.UpdateQueueSize(size, syncdeps.MsgTypeIncomingRequest, true)
defer r.metric.UpdateQueueSize(size, syncdeps.MsgTypeIncomingRequest, false)
if !r.incomingGuard.TryTake(fullId(rq.PeerId(), rq.ObjectId())) {
return nil
}
defer r.incomingGuard.Release(fullId(rq.PeerId(), rq.ObjectId()))
r.metric.UpdateQueueSize(size, syncdeps.MsgTypeIncomingRequest, true)
defer r.metric.UpdateQueueSize(size, syncdeps.MsgTypeIncomingRequest, false)
r.limit.Take(rq.PeerId())
defer r.limit.Release(rq.PeerId())
newRq, err := r.handler.HandleStreamRequest(ctx, rq, r.metric, func(resp proto.Message) error {
return stream.MsgSend(resp, streampool.EncodingProto)
})