1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Add new limiter configuration

This commit is contained in:
mcrakhman 2024-08-07 22:57:42 +02:00
parent 5ac5ced34f
commit 17ea292584
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
3 changed files with 75 additions and 43 deletions

View file

@ -2,28 +2,55 @@ package sync
import (
"sync"
"golang.org/x/exp/slices"
)
type Limit struct {
max int
tokens map[string]int
mx sync.Mutex
peerStep []int
totalStep []int
counter int
total int
tokens map[string]int
mx sync.Mutex
}
func NewLimit(max int) *Limit {
func NewLimit(peerStep, totalStep []int) *Limit {
if len(peerStep) == 0 || len(totalStep) == 0 || len(peerStep) != len(totalStep)+1 {
panic("incorrect limit configuration")
}
slices.SortFunc(peerStep, func(a, b int) int {
if a < b {
return 1
} else if a > b {
return -1
} else {
return 0
}
})
slices.Sort(totalStep)
// so here we would have something like
// peerStep = [3, 2, 1]
// totalStep = [3, 6], where everything more than 6 in total will get 1 token for each id
totalStep = append(totalStep, totalStep[len(totalStep)-1])
return &Limit{
max: max,
tokens: make(map[string]int),
peerStep: peerStep,
totalStep: totalStep,
tokens: make(map[string]int),
}
}
func (l *Limit) Take(id string) bool {
l.mx.Lock()
defer l.mx.Unlock()
if l.tokens[id] >= l.max {
if l.tokens[id] >= l.peerStep[l.counter] {
return false
}
l.tokens[id]++
l.total++
if l.total >= l.totalStep[l.counter] && l.counter < len(l.totalStep)-1 {
l.counter++
}
return true
}
@ -32,5 +59,16 @@ func (l *Limit) Release(id string) {
defer l.mx.Unlock()
if l.tokens[id] > 0 {
l.tokens[id]--
} else {
return
}
l.total--
if l.total < l.totalStep[l.counter] {
if l.counter == len(l.totalStep)-1 {
l.counter--
}
if l.counter > 0 {
l.counter--
}
}
}

View file

@ -1,47 +1,41 @@
package sync
import (
"sync"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestLimit(t *testing.T) {
l := NewLimit(3)
ids := []string{"id1", "id2", "id3"}
var wg sync.WaitGroup
wg.Add(len(ids) * 5)
// Function to simulate taking and releasing tokens
testFunc := func(i int, id string) {
defer wg.Done()
start := time.Now()
l.Take(id)
taken := time.Since(start)
// Ensure no more than the limit of tokens are taken simultaneously
if taken > time.Second {
t.Errorf("Goroutine %d for %s waited too long to take a token", i, id)
for _, tc := range []struct {
peerStep []int
}{
{
peerStep: []int{5, 4, 3, 2, 1},
},
} {
totalStep := make([]int, len(tc.peerStep)-1)
totalStep[0] = tc.peerStep[0]
for i := 1; i < len(tc.peerStep)-2; i++ {
totalStep[i] = totalStep[i-1] + tc.peerStep[i] + 1
}
time.Sleep(500 * time.Millisecond)
l.Release(id)
}
for i := 0; i < 5; i++ {
for _, id := range ids {
go testFunc(i, id)
totalStep[len(totalStep)-1] = totalStep[len(totalStep)-2] + tc.peerStep[len(tc.peerStep)-1]
l := NewLimit(tc.peerStep, totalStep)
for j := 0; j < len(tc.peerStep); j++ {
for i := 0; i < tc.peerStep[j]; i++ {
require.True(t, l.Take(fmt.Sprint(j)))
}
require.False(t, l.Take(fmt.Sprint(j)))
}
}
wg.Wait()
// Ensure all tokens are released
for _, id := range ids {
if l.tokens[id] != 0 {
t.Errorf("Tokens for %s should be 0, got %d", id, l.tokens[id])
require.Equal(t, len(tc.peerStep)-1, l.counter)
require.Equal(t, totalStep[len(totalStep)-1], l.total)
for j := 0; j < len(tc.peerStep); j++ {
for i := 0; i < tc.peerStep[j]; i++ {
l.Release(fmt.Sprint(j))
}
}
require.Equal(t, 0, l.counter)
require.Equal(t, 0, l.total)
}
}

View file

@ -40,7 +40,7 @@ type requestManager struct {
func NewRequestManager(handler syncdeps.SyncHandler, metric syncdeps.QueueSizeUpdater) RequestManager {
return &requestManager{
requestPool: NewRequestPool(),
limit: NewLimit(10),
limit: NewLimit([]int{20, 15, 10, 5}, []int{200, 400, 600}),
handler: handler,
incomingGuard: newGuard(0),
metric: metric,