mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
78 lines
1.8 KiB
Go
78 lines
1.8 KiB
Go
package multiqueue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
type sizeableString string
|
|
|
|
func (s sizeableString) MsgSize() uint64 {
|
|
return 0
|
|
}
|
|
|
|
type mockUpdater struct {
|
|
}
|
|
|
|
func (m mockUpdater) UpdateQueueSize(size uint64, msgType int, add bool) {
|
|
}
|
|
|
|
func TestMultiQueue_Add(t *testing.T) {
|
|
t.Run("process", func(t *testing.T) {
|
|
var msgsCh = make(chan sizeableString)
|
|
var h HandleFunc[sizeableString] = func(msg sizeableString) {
|
|
msgsCh <- msg
|
|
}
|
|
q := New[sizeableString](h, mockUpdater{}, 0, 10)
|
|
defer func() {
|
|
require.NoError(t, q.Close())
|
|
}()
|
|
|
|
for i := 0; i < 5; i++ {
|
|
for j := 0; j < 5; j++ {
|
|
assert.NoError(t, q.Add(context.Background(), fmt.Sprint(i), sizeableString(fmt.Sprint(i, j))))
|
|
}
|
|
}
|
|
var msgs []sizeableString
|
|
for i := 0; i < 5*5; i++ {
|
|
select {
|
|
case <-time.After(time.Second / 4):
|
|
require.True(t, false, "timeout")
|
|
case msg := <-msgsCh:
|
|
msgs = append(msgs, msg)
|
|
}
|
|
}
|
|
assert.Len(t, msgs, 25)
|
|
})
|
|
t.Run("add to closed", func(t *testing.T) {
|
|
q := New[sizeableString](func(msg sizeableString) {}, mockUpdater{}, 0, 10)
|
|
require.NoError(t, q.Close())
|
|
assert.Equal(t, ErrClosed, q.Add(context.Background(), "1", "1"))
|
|
})
|
|
}
|
|
|
|
func TestMultiQueue_CloseThread(t *testing.T) {
|
|
var msgsCh = make(chan sizeableString)
|
|
var h HandleFunc[sizeableString] = func(msg sizeableString) {
|
|
msgsCh <- msg
|
|
}
|
|
q := New[sizeableString](h, mockUpdater{}, 0, 10)
|
|
defer func() {
|
|
require.NoError(t, q.Close())
|
|
}()
|
|
require.NoError(t, q.Add(context.Background(), "1", "1"))
|
|
require.NoError(t, q.Add(context.Background(), "1", "2"))
|
|
require.NoError(t, q.CloseThread("1"))
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case <-msgsCh:
|
|
case <-time.After(time.Second / 4):
|
|
require.False(t, true, "timeout")
|
|
}
|
|
}
|
|
}
|