1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00
any-sync/consensus/consensusclient/stream.go
2023-07-03 16:19:24 +02:00

70 lines
1.4 KiB
Go

package consensusclient
import (
"context"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/cheggaaa/mb/v3"
"sync"
)
func runStream(rpcStream consensusproto.DRPCConsensus_LogWatchClient) *stream {
st := &stream{
rpcStream: rpcStream,
mb: mb.New[*consensusproto.LogWatchEvent](100),
}
go st.readStream()
return st
}
type stream struct {
rpcStream consensusproto.DRPCConsensus_LogWatchClient
mb *mb.MB[*consensusproto.LogWatchEvent]
mu sync.Mutex
err error
}
func (s *stream) WatchIds(logIds []string) (err error) {
return s.rpcStream.Send(&consensusproto.LogWatchRequest{
WatchIds: logIds,
})
}
func (s *stream) UnwatchIds(logIds []string) (err error) {
return s.rpcStream.Send(&consensusproto.LogWatchRequest{
UnwatchIds: logIds,
})
}
func (s *stream) WaitLogs() []*consensusproto.LogWatchEvent {
events, _ := s.mb.Wait(context.TODO())
return events
}
func (s *stream) Err() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.err
}
func (s *stream) readStream() {
defer s.Close()
for {
event, err := s.rpcStream.Recv()
if err != nil {
s.mu.Lock()
s.err = err
s.mu.Unlock()
return
}
if err = s.mb.Add(s.rpcStream.Context(), event); err != nil {
return
}
}
}
func (s *stream) Close() error {
if err := s.mb.Close(); err == nil {
return s.rpcStream.Close()
}
return nil
}