mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
119 lines
2.6 KiB
Go
119 lines
2.6 KiB
Go
package syncqueues
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/anyproto/any-sync/util/periodicsync"
|
|
)
|
|
|
|
type ActionPool interface {
|
|
Run()
|
|
Add(peerId, objectId string, action func(ctx context.Context), remove func())
|
|
Close()
|
|
}
|
|
|
|
type actionPool struct {
|
|
mu sync.Mutex
|
|
peerGuard *Guard
|
|
queues map[string]*replaceableQueue
|
|
periodicLoop periodicsync.PeriodicSync
|
|
closePeriod time.Duration
|
|
gcPeriod time.Duration
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
openFunc func(peerId string) *replaceableQueue
|
|
isClosed bool
|
|
}
|
|
|
|
func NewActionPool(closePeriod, gcPeriod time.Duration, openFunc func(peerId string) *replaceableQueue) ActionPool {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &actionPool{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
closePeriod: closePeriod,
|
|
gcPeriod: gcPeriod,
|
|
openFunc: openFunc,
|
|
queues: make(map[string]*replaceableQueue),
|
|
peerGuard: NewGuard(),
|
|
}
|
|
}
|
|
|
|
func (rp *actionPool) tryTake(peerId, objectId string) bool {
|
|
rp.mu.Lock()
|
|
defer rp.mu.Unlock()
|
|
if rp.isClosed {
|
|
return false
|
|
}
|
|
|
|
return rp.peerGuard.TryTake(fullId(peerId, objectId))
|
|
}
|
|
|
|
func (rp *actionPool) release(peerId, objectId string) {
|
|
rp.peerGuard.Release(fullId(peerId, objectId))
|
|
}
|
|
|
|
func (rp *actionPool) Run() {
|
|
rp.periodicLoop = periodicsync.NewPeriodicSyncDuration(rp.gcPeriod, time.Minute, rp.gc, log)
|
|
rp.periodicLoop.Run()
|
|
}
|
|
|
|
func (rp *actionPool) gc(ctx context.Context) error {
|
|
rp.mu.Lock()
|
|
var queuesToClose []*replaceableQueue
|
|
tm := time.Now()
|
|
for id, queue := range rp.queues {
|
|
if queue.ShouldClose(tm, rp.closePeriod) {
|
|
delete(rp.queues, id)
|
|
log.Debug("closing queue", zap.String("peerId", id))
|
|
queuesToClose = append(queuesToClose, queue)
|
|
}
|
|
}
|
|
rp.mu.Unlock()
|
|
for _, queue := range queuesToClose {
|
|
_ = queue.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (rp *actionPool) Add(peerId, objectId string, action func(ctx context.Context), remove func()) {
|
|
rp.mu.Lock()
|
|
if rp.isClosed {
|
|
rp.mu.Unlock()
|
|
return
|
|
}
|
|
var (
|
|
queue *replaceableQueue
|
|
exists bool
|
|
)
|
|
queue, exists = rp.queues[peerId]
|
|
if !exists {
|
|
queue = rp.openFunc(peerId)
|
|
rp.queues[peerId] = queue
|
|
queue.Run()
|
|
}
|
|
rp.mu.Unlock()
|
|
var wrappedAction func()
|
|
wrappedAction = func() {
|
|
// this prevents cases when two simultaneous requests are sent at the same time
|
|
if !rp.tryTake(peerId, objectId) {
|
|
return
|
|
}
|
|
action(rp.ctx)
|
|
rp.release(peerId, objectId)
|
|
}
|
|
queue.Replace(objectId, wrappedAction, remove)
|
|
}
|
|
|
|
func (rp *actionPool) Close() {
|
|
rp.periodicLoop.Close()
|
|
rp.mu.Lock()
|
|
defer rp.mu.Unlock()
|
|
rp.isClosed = true
|
|
for _, queue := range rp.queues {
|
|
_ = queue.Close()
|
|
}
|
|
}
|