1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-07 21:37:04 +09:00
anytype-heart/tests/events_test.go
2023-05-24 17:10:52 +02:00

90 lines
1.6 KiB
Go

//go:build integration
package tests
import (
"context"
"errors"
"fmt"
"io"
"sync"
"testing"
"time"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pb/service"
)
type eventReceiver struct {
lock *sync.Mutex
events []*pb.EventMessage
token string
}
func startEventReceiver(ctx context.Context, c service.ClientCommandsClient, tok string) (*eventReceiver, error) {
stream, err := c.ListenSessionEvents(ctx, &pb.StreamRequest{Token: tok})
if err != nil {
return nil, err
}
er := &eventReceiver{
lock: &sync.Mutex{},
token: tok,
}
go func() {
for {
select {
case <-ctx.Done():
return
default:
ev, err := stream.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
fmt.Println("receive error:", err)
continue
}
er.lock.Lock()
for _, m := range ev.Messages {
er.events = append(er.events, m)
}
er.lock.Unlock()
}
}
}()
return er, nil
}
type eventReceiverProvider interface {
EventReceiver() *eventReceiver
}
func waitEvent[msgType pb.IsEventMessageValue](t *testing.T, provider eventReceiverProvider, fn func(x msgType)) {
er := provider.EventReceiver()
ticker := time.NewTicker(10 * time.Millisecond)
timeout := time.NewTimer(2 * time.Second)
for {
er.lock.Lock()
for i := len(er.events) - 1; i >= 0; i-- {
m := er.events[i]
if m == nil {
continue
}
if v, ok := m.Value.(msgType); ok {
fn(v)
er.events[i] = nil
er.lock.Unlock()
return
}
}
er.lock.Unlock()
select {
case <-ticker.C:
case <-timeout.C:
t.Fatal("wait event timeout")
}
}
}