1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00
any-sync/util/syncqueues/actionpool.go
2024-08-14 23:06:01 +02:00

119 lines
2.6 KiB
Go

package syncqueues
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"github.com/anyproto/any-sync/util/periodicsync"
)
type ActionPool interface {
Run()
Add(peerId, objectId string, action func(ctx context.Context), remove func())
Close()
}
type actionPool struct {
mu sync.Mutex
peerGuard *Guard
queues map[string]*replaceableQueue
periodicLoop periodicsync.PeriodicSync
closePeriod time.Duration
gcPeriod time.Duration
ctx context.Context
cancel context.CancelFunc
openFunc func(peerId string) *replaceableQueue
isClosed bool
}
func NewActionPool(closePeriod, gcPeriod time.Duration, openFunc func(peerId string) *replaceableQueue) ActionPool {
ctx, cancel := context.WithCancel(context.Background())
return &actionPool{
ctx: ctx,
cancel: cancel,
closePeriod: closePeriod,
gcPeriod: gcPeriod,
openFunc: openFunc,
queues: make(map[string]*replaceableQueue),
peerGuard: NewGuard(),
}
}
func (rp *actionPool) tryTake(peerId, objectId string) bool {
rp.mu.Lock()
defer rp.mu.Unlock()
if rp.isClosed {
return false
}
return rp.peerGuard.TryTake(fullId(peerId, objectId))
}
func (rp *actionPool) release(peerId, objectId string) {
rp.peerGuard.Release(fullId(peerId, objectId))
}
func (rp *actionPool) Run() {
rp.periodicLoop = periodicsync.NewPeriodicSyncDuration(rp.gcPeriod, time.Minute, rp.gc, log)
rp.periodicLoop.Run()
}
func (rp *actionPool) gc(ctx context.Context) error {
rp.mu.Lock()
var queuesToClose []*replaceableQueue
tm := time.Now()
for id, queue := range rp.queues {
if queue.ShouldClose(tm, rp.closePeriod) {
delete(rp.queues, id)
log.Debug("closing queue", zap.String("peerId", id))
queuesToClose = append(queuesToClose, queue)
}
}
rp.mu.Unlock()
for _, queue := range queuesToClose {
_ = queue.Close()
}
return nil
}
func (rp *actionPool) Add(peerId, objectId string, action func(ctx context.Context), remove func()) {
rp.mu.Lock()
if rp.isClosed {
rp.mu.Unlock()
return
}
var (
queue *replaceableQueue
exists bool
)
queue, exists = rp.queues[peerId]
if !exists {
queue = rp.openFunc(peerId)
rp.queues[peerId] = queue
queue.Run()
}
rp.mu.Unlock()
var wrappedAction func()
wrappedAction = func() {
// this prevents cases when two simultaneous requests are sent at the same time
if !rp.tryTake(peerId, objectId) {
return
}
action(rp.ctx)
rp.release(peerId, objectId)
}
queue.Replace(objectId, wrappedAction, remove)
}
func (rp *actionPool) Close() {
rp.periodicLoop.Close()
rp.mu.Lock()
defer rp.mu.Unlock()
rp.isClosed = true
for _, queue := range rp.queues {
_ = queue.Close()
}
}