1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00
any-sync/util/syncqueues/actionpool_test.go
2024-08-28 15:29:13 +02:00

150 lines
4.1 KiB
Go

package syncqueues
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
func TestRequestPool(t *testing.T) {
t.Run("parallel, different peer, same object", func(t *testing.T) {
rp := NewActionPool(time.Minute, time.Minute, func(peerId string) *replaceableQueue {
return newReplaceableQueue(1, 1)
})
rp.Run()
// we use wait channel to make sure that blocking does not prevent action from being called
wait := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(2)
rp.Add("peerId", "objectId", func(ctx context.Context) {
wg.Done()
<-wait
}, func() {})
rp.Add("peerId1", "objectId", func(ctx context.Context) {
wg.Done()
<-wait
}, func() {})
wg.Wait()
rp.Close()
})
t.Run("parallel, same peer, different object", func(t *testing.T) {
rp := NewActionPool(time.Minute, time.Minute, func(peerId string) *replaceableQueue {
return newReplaceableQueue(2, 2)
})
rp.Run()
// we use wait channel to make sure that blocking does not prevent action from being called
wait := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(2)
rp.Add("peerId", "objectId", func(ctx context.Context) {
wg.Done()
<-wait
}, func() {})
rp.Add("peerId", "objectId1", func(ctx context.Context) {
wg.Done()
<-wait
}, func() {})
wg.Wait()
rp.Close()
})
t.Run("parallel, same peer, same object", func(t *testing.T) {
rp := NewActionPool(time.Minute, time.Minute, func(peerId string) *replaceableQueue {
return newReplaceableQueue(2, 2)
})
rp.Run()
// here we are checking that the second action is not called in parallel,
// when the first action is not finished
wait := make(chan struct{})
cnt := atomic.NewBool(false)
wg := &sync.WaitGroup{}
wg.Add(1)
rp.Add("peerId", "objectId", func(ctx context.Context) {
cnt.Store(true)
wg.Done()
<-wait
}, func() {})
time.Sleep(100 * time.Millisecond)
rp.Add("peerId", "objectId", func(ctx context.Context) {
require.Fail(t, "should not be called")
wg.Done()
<-wait
}, func() {})
wg.Wait()
time.Sleep(100 * time.Millisecond)
require.True(t, cnt.Load())
rp.Close()
})
t.Run("parallel, same peer, different object, replace", func(t *testing.T) {
rp := NewActionPool(time.Minute, time.Minute, func(peerId string) *replaceableQueue {
return newReplaceableQueue(1, 3)
})
rp.Run()
// we expect the second action to be replaced
wait := make(chan struct{})
cnt := atomic.NewBool(false)
wg := &sync.WaitGroup{}
wg.Add(1)
rp.Add("peerId", "objectId", func(ctx context.Context) {
<-wait
}, func() {})
rp.Add("peerId", "objectId1", func(ctx context.Context) {
require.Fail(t, "should not be called")
}, func() {})
rp.Add("peerId", "objectId1", func(ctx context.Context) {
cnt.Store(true)
wg.Done()
}, func() {})
close(wait)
wg.Wait()
time.Sleep(100 * time.Millisecond)
require.True(t, cnt.Load())
rp.Close()
})
t.Run("parallel, same peer, different object, try add failed", func(t *testing.T) {
rp := NewActionPool(time.Minute, time.Minute, func(peerId string) *replaceableQueue {
return newReplaceableQueue(1, 1)
})
rp.Run()
// we expect try add to fail and call remove action
wait := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(2)
rp.Add("peerId", "objectId", func(ctx context.Context) {
<-wait
}, func() {})
time.Sleep(100 * time.Millisecond)
rp.Add("peerId", "objectId1", func(ctx context.Context) {
wg.Done()
}, func() {
})
rp.Add("peerId", "objectId2", func(ctx context.Context) {
}, func() {
wg.Done()
})
close(wait)
wg.Wait()
rp.Close()
})
t.Run("gc", func(t *testing.T) {
rp := NewActionPool(time.Millisecond*20, time.Millisecond*20, func(peerId string) *replaceableQueue {
return newReplaceableQueue(2, 2)
})
rp.Run()
wg := &sync.WaitGroup{}
wg.Add(2)
rp.Add("peerId1", "objectId1", func(ctx context.Context) {
wg.Done()
}, func() {})
rp.Add("peerId2", "objectId2", func(ctx context.Context) {
wg.Done()
}, func() {})
wg.Wait()
time.Sleep(200 * time.Millisecond)
require.Empty(t, rp.(*actionPool).queues)
rp.Close()
})
}