mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-07 21:37:04 +09:00
90 lines
1.6 KiB
Go
90 lines
1.6 KiB
Go
//go:build integration
|
|
|
|
package tests
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/anyproto/anytype-heart/pb"
|
|
"github.com/anyproto/anytype-heart/pb/service"
|
|
)
|
|
|
|
type eventReceiver struct {
|
|
lock *sync.Mutex
|
|
events []*pb.EventMessage
|
|
token string
|
|
}
|
|
|
|
func startEventReceiver(ctx context.Context, c service.ClientCommandsClient, tok string) (*eventReceiver, error) {
|
|
stream, err := c.ListenSessionEvents(ctx, &pb.StreamRequest{Token: tok})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
er := &eventReceiver{
|
|
lock: &sync.Mutex{},
|
|
token: tok,
|
|
}
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
ev, err := stream.Recv()
|
|
if errors.Is(err, io.EOF) {
|
|
return
|
|
}
|
|
if err != nil {
|
|
fmt.Println("receive error:", err)
|
|
continue
|
|
}
|
|
er.lock.Lock()
|
|
for _, m := range ev.Messages {
|
|
er.events = append(er.events, m)
|
|
}
|
|
er.lock.Unlock()
|
|
}
|
|
}
|
|
}()
|
|
return er, nil
|
|
}
|
|
|
|
type eventReceiverProvider interface {
|
|
EventReceiver() *eventReceiver
|
|
}
|
|
|
|
func waitEvent[msgType pb.IsEventMessageValue](t *testing.T, provider eventReceiverProvider, fn func(x msgType)) {
|
|
er := provider.EventReceiver()
|
|
|
|
ticker := time.NewTicker(10 * time.Millisecond)
|
|
timeout := time.NewTimer(2 * time.Second)
|
|
for {
|
|
er.lock.Lock()
|
|
for i := len(er.events) - 1; i >= 0; i-- {
|
|
m := er.events[i]
|
|
if m == nil {
|
|
continue
|
|
}
|
|
if v, ok := m.Value.(msgType); ok {
|
|
fn(v)
|
|
er.events[i] = nil
|
|
er.lock.Unlock()
|
|
return
|
|
}
|
|
}
|
|
er.lock.Unlock()
|
|
|
|
select {
|
|
case <-ticker.C:
|
|
case <-timeout.C:
|
|
t.Fatal("wait event timeout")
|
|
}
|
|
}
|
|
}
|