1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Add excluded limit

This commit is contained in:
mcrakhman 2024-08-08 12:01:44 +02:00
parent 17ea292584
commit 138df7a234
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
6 changed files with 92 additions and 20 deletions

View file

@ -6,10 +6,11 @@ import (
"fmt"
"time"
"go.uber.org/zap"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/util/slice"
"go.uber.org/zap"
)
var (
@ -181,6 +182,7 @@ func (tb *treeBuilder) loadChange(id string) (ch *Change, err error) {
if err != nil {
return nil, err
}
// TODO: see if we can delete this
if !tb.keepInMemoryData {
ch.Data = nil
}

View file

@ -1,21 +1,25 @@
package sync
import (
"fmt"
"sync"
"golang.org/x/exp/slices"
)
type Limit struct {
peerStep []int
totalStep []int
counter int
total int
tokens map[string]int
mx sync.Mutex
peerStep []int
totalStep []int
excludeIds []string
excludedLimit int
excludedTotal int
counter int
total int
tokens map[string]int
mx sync.Mutex
}
func NewLimit(peerStep, totalStep []int) *Limit {
func NewLimit(peerStep, totalStep []int, excludeIds []string, excludedLimit int) *Limit {
if len(peerStep) == 0 || len(totalStep) == 0 || len(peerStep) != len(totalStep)+1 {
panic("incorrect limit configuration")
}
@ -34,15 +38,25 @@ func NewLimit(peerStep, totalStep []int) *Limit {
// totalStep = [3, 6], where everything more than 6 in total will get 1 token for each id
totalStep = append(totalStep, totalStep[len(totalStep)-1])
return &Limit{
peerStep: peerStep,
totalStep: totalStep,
tokens: make(map[string]int),
excludeIds: excludeIds,
excludedLimit: excludedLimit,
peerStep: peerStep,
totalStep: totalStep,
tokens: make(map[string]int),
}
}
func (l *Limit) Take(id string) bool {
l.mx.Lock()
defer l.mx.Unlock()
if l.isExcluded(id) {
if l.tokens[id] >= l.excludedLimit {
return false
}
l.tokens[id]++
l.excludedTotal++
return true
}
if l.tokens[id] >= l.peerStep[l.counter] {
return false
}
@ -62,6 +76,10 @@ func (l *Limit) Release(id string) {
} else {
return
}
if l.isExcluded(id) {
l.excludedTotal--
return
}
l.total--
if l.total < l.totalStep[l.counter] {
if l.counter == len(l.totalStep)-1 {
@ -72,3 +90,21 @@ func (l *Limit) Release(id string) {
}
}
}
func (l *Limit) isExcluded(id string) bool {
for _, excludeId := range l.excludeIds {
if id == excludeId {
return true
}
}
return false
}
func (l *Limit) Stats(id string) string {
l.mx.Lock()
defer l.mx.Unlock()
if l.isExcluded(id) {
return fmt.Sprintf("excluded peer: %d/%d, total: %d/%d/%d", l.tokens[id], l.excludedLimit, l.excludedTotal, l.total, l.totalStep[l.counter])
}
return fmt.Sprintf("peer: %d/%d, total: %d/%d/%d", l.tokens[id], l.peerStep[l.counter], l.excludedTotal, l.total, l.totalStep[l.counter])
}

View file

@ -7,12 +7,16 @@ import (
"github.com/stretchr/testify/require"
)
func TestLimit(t *testing.T) {
func TestLimitExcluded(t *testing.T) {
for _, tc := range []struct {
peerStep []int
peerStep []int
excludeIds []string
excludedLimit int
}{
{
peerStep: []int{5, 4, 3, 2, 1},
peerStep: []int{5, 4, 3, 2, 1},
excludeIds: []string{"excluded1", "excluded2"},
excludedLimit: 10,
},
} {
totalStep := make([]int, len(tc.peerStep)-1)
@ -21,7 +25,9 @@ func TestLimit(t *testing.T) {
totalStep[i] = totalStep[i-1] + tc.peerStep[i] + 1
}
totalStep[len(totalStep)-1] = totalStep[len(totalStep)-2] + tc.peerStep[len(tc.peerStep)-1]
l := NewLimit(tc.peerStep, totalStep)
l := NewLimit(tc.peerStep, totalStep, tc.excludeIds, tc.excludedLimit)
// Test regular peers
for j := 0; j < len(tc.peerStep); j++ {
for i := 0; i < tc.peerStep[j]; i++ {
require.True(t, l.Take(fmt.Sprint(j)))
@ -30,6 +36,16 @@ func TestLimit(t *testing.T) {
}
require.Equal(t, len(tc.peerStep)-1, l.counter)
require.Equal(t, totalStep[len(totalStep)-1], l.total)
// Test excluded peers
for _, id := range tc.excludeIds {
for i := 0; i < tc.excludedLimit; i++ {
require.True(t, l.Take(id))
}
require.False(t, l.Take(id))
}
// Release regular peers
for j := 0; j < len(tc.peerStep); j++ {
for i := 0; i < tc.peerStep[j]; i++ {
l.Release(fmt.Sprint(j))
@ -37,5 +53,13 @@ func TestLimit(t *testing.T) {
}
require.Equal(t, 0, l.counter)
require.Equal(t, 0, l.total)
// Release excluded peers
for _, id := range tc.excludeIds {
for i := 0; i < tc.excludedLimit; i++ {
l.Release(id)
}
}
require.Equal(t, 0, l.excludedTotal)
}
}

View file

@ -5,11 +5,12 @@ import (
"errors"
"fmt"
"github.com/cheggaaa/mb/v3"
"github.com/anyproto/protobuf/proto"
"github.com/cheggaaa/mb/v3"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
@ -27,6 +28,8 @@ import (
var ErrUnexpectedHeadUpdateType = errors.New("unexpected head update type")
var log = logger.NewNamed(syncdeps.CName)
type objectSync struct {
spaceId string
pool pool.Service

View file

@ -37,10 +37,10 @@ type requestManager struct {
metric syncdeps.QueueSizeUpdater
}
func NewRequestManager(handler syncdeps.SyncHandler, metric syncdeps.QueueSizeUpdater) RequestManager {
func NewRequestManager(handler syncdeps.SyncHandler, metric syncdeps.QueueSizeUpdater, responsibleNodeIds []string) RequestManager {
return &requestManager{
requestPool: NewRequestPool(),
limit: NewLimit([]int{20, 15, 10, 5}, []int{200, 400, 600}),
limit: NewLimit([]int{15, 10, 5}, []int{200, 400}, responsibleNodeIds, 20),
handler: handler,
incomingGuard: newGuard(0),
metric: metric,
@ -77,7 +77,7 @@ func (r *requestManager) QueueRequest(rq syncdeps.Request) error {
return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) {
err := r.handler.ApplyRequest(ctx, rq, r)
if err != nil {
log.Error("failed to apply request", zap.Error(err))
log.Error("failed to apply request", zap.Error(err), zap.String("limit stats", r.limit.Stats(rq.PeerId())))
}
}, func() {
r.metric.UpdateQueueSize(size, syncdeps.MsgTypeOutgoingRequest, false)

View file

@ -16,6 +16,7 @@ import (
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/streampool"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/util/multiqueue"
)
@ -41,6 +42,7 @@ type syncService struct {
manager RequestManager
streamPool streampool.StreamPool
peerManager peermanager.PeerManager
nodeConf nodeconf.NodeConf
handler syncdeps.SyncHandler
spaceId string
metric *syncMetric
@ -75,8 +77,13 @@ func (s *syncService) Init(a *app.App) (err error) {
s.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
s.streamPool = a.MustComponent(streampool.CName).(streampool.StreamPool)
s.commonMetric, _ = a.Component(metric.CName).(metric.Metric)
s.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
var nodeIds []string
for _, node := range s.nodeConf.Configuration().Nodes {
nodeIds = append(nodeIds, node.PeerId)
}
s.streamPool.SetSyncDelegate(s)
s.manager = NewRequestManager(s.handler, s.metric)
s.manager = NewRequestManager(s.handler, s.metric, nodeIds)
s.ctx, s.cancel = context.WithCancel(context.Background())
return nil
}