1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00
any-sync/util/multiqueue/multiqueue_test.go
2024-08-14 00:28:11 +02:00

78 lines
1.8 KiB
Go

package multiqueue
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type sizeableString string
func (s sizeableString) MsgSize() uint64 {
return 0
}
type mockUpdater struct {
}
func (m mockUpdater) UpdateQueueSize(size uint64, msgType int, add bool) {
}
func TestMultiQueue_Add(t *testing.T) {
t.Run("process", func(t *testing.T) {
var msgsCh = make(chan sizeableString)
var h HandleFunc[sizeableString] = func(msg sizeableString) {
msgsCh <- msg
}
q := New[sizeableString](h, mockUpdater{}, 0, 10)
defer func() {
require.NoError(t, q.Close())
}()
for i := 0; i < 5; i++ {
for j := 0; j < 5; j++ {
assert.NoError(t, q.Add(context.Background(), fmt.Sprint(i), sizeableString(fmt.Sprint(i, j))))
}
}
var msgs []sizeableString
for i := 0; i < 5*5; i++ {
select {
case <-time.After(time.Second / 4):
require.True(t, false, "timeout")
case msg := <-msgsCh:
msgs = append(msgs, msg)
}
}
assert.Len(t, msgs, 25)
})
t.Run("add to closed", func(t *testing.T) {
q := New[sizeableString](func(msg sizeableString) {}, mockUpdater{}, 0, 10)
require.NoError(t, q.Close())
assert.Equal(t, ErrClosed, q.Add(context.Background(), "1", "1"))
})
}
func TestMultiQueue_CloseThread(t *testing.T) {
var msgsCh = make(chan sizeableString)
var h HandleFunc[sizeableString] = func(msg sizeableString) {
msgsCh <- msg
}
q := New[sizeableString](h, mockUpdater{}, 0, 10)
defer func() {
require.NoError(t, q.Close())
}()
require.NoError(t, q.Add(context.Background(), "1", "1"))
require.NoError(t, q.Add(context.Background(), "1", "2"))
require.NoError(t, q.CloseThread("1"))
for i := 0; i < 2; i++ {
select {
case <-msgsCh:
case <-time.After(time.Second / 4):
require.False(t, true, "timeout")
}
}
}