mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
82 lines
1.8 KiB
Go
82 lines
1.8 KiB
Go
package syncqueues
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"golang.org/x/exp/slices"
|
|
|
|
accountService "github.com/anyproto/any-sync/accountservice"
|
|
"github.com/anyproto/any-sync/app"
|
|
"github.com/anyproto/any-sync/app/logger"
|
|
"github.com/anyproto/any-sync/nodeconf"
|
|
)
|
|
|
|
const CName = "common.util.syncqueues"
|
|
|
|
var log = logger.NewNamed(CName)
|
|
|
|
type SyncQueues interface {
|
|
app.ComponentRunnable
|
|
ActionPool(spaceId string) ActionPool
|
|
Limit(spaceId string) *Limit
|
|
}
|
|
|
|
func New() SyncQueues {
|
|
return &syncQueues{}
|
|
}
|
|
|
|
type syncQueues struct {
|
|
limit *Limit
|
|
pool ActionPool
|
|
nodeConf nodeconf.Service
|
|
accountService accountService.Service
|
|
}
|
|
|
|
func (g *syncQueues) Init(a *app.App) (err error) {
|
|
g.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
|
|
g.accountService = a.MustComponent(accountService.CName).(accountService.Service)
|
|
var (
|
|
nodeIds []string
|
|
iAmResponsible bool
|
|
myId = g.accountService.Account().PeerId
|
|
)
|
|
for _, node := range g.nodeConf.Configuration().Nodes {
|
|
nodeIds = append(nodeIds, node.PeerId)
|
|
if node.PeerId == myId {
|
|
iAmResponsible = true
|
|
}
|
|
}
|
|
g.pool = NewActionPool(time.Second*30, time.Minute, func(peerId string) *replaceableQueue {
|
|
// increase limits between responsible nodes
|
|
if slices.Contains(nodeIds, peerId) && iAmResponsible {
|
|
return newReplaceableQueue(30, 400)
|
|
} else {
|
|
return newReplaceableQueue(10, 100)
|
|
}
|
|
})
|
|
g.limit = NewLimit([]int{20, 15, 10, 5}, []int{200, 400, 800}, nodeIds, 100)
|
|
return
|
|
}
|
|
|
|
func (g *syncQueues) Run(ctx context.Context) (err error) {
|
|
g.pool.Run()
|
|
return
|
|
}
|
|
|
|
func (g *syncQueues) Close(ctx context.Context) (err error) {
|
|
g.pool.Close()
|
|
return
|
|
}
|
|
|
|
func (g *syncQueues) ActionPool(spaceId string) ActionPool {
|
|
return g.pool
|
|
}
|
|
|
|
func (g *syncQueues) Limit(spaceId string) *Limit {
|
|
return g.limit
|
|
}
|
|
|
|
func (g *syncQueues) Name() string {
|
|
return CName
|
|
}
|