1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00
any-sync/net/streampool/sendpool.go
2023-06-07 21:48:13 +02:00

59 lines
1.1 KiB
Go

package streampool
import (
"context"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
)
// NewExecPool creates new ExecPool
// workers - how many processes will execute tasks
// maxSize - limit for queue size
func NewExecPool(workers, maxSize int) *ExecPool {
ctx, cancel := context.WithCancel(context.Background())
ss := &ExecPool{
ctx: ctx,
cancel: cancel,
workers: workers,
batch: mb.New[func()](maxSize),
}
return ss
}
// ExecPool needed for parallel execution of the incoming send tasks
type ExecPool struct {
ctx context.Context
cancel context.CancelFunc
workers int
batch *mb.MB[func()]
}
func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error) {
return ss.batch.Add(ctx, f...)
}
func (ss *ExecPool) TryAdd(f ...func()) (err error) {
return ss.batch.TryAdd(f...)
}
func (ss *ExecPool) Run() {
for i := 0; i < ss.workers; i++ {
go ss.sendLoop()
}
}
func (ss *ExecPool) sendLoop() {
for {
f, err := ss.batch.WaitOne(ss.ctx)
if err != nil {
log.Debug("close send loop", zap.Error(err))
return
}
f()
}
}
func (ss *ExecPool) Close() (err error) {
ss.cancel()
return ss.batch.Close()
}