1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

remove stream multiqueue

This commit is contained in:
Sergey Cherepanov 2023-06-05 21:34:23 +02:00
parent af9d71d16e
commit 0b4f08fbef
No known key found for this signature in database
GPG key ID: 87F8EDE8FBDF637C
3 changed files with 26 additions and 35 deletions

View file

@ -3,7 +3,7 @@ package streampool
import ( import (
"context" "context"
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/util/multiqueue" "github.com/cheggaaa/mb/v3"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/drpc" "storj.io/drpc"
"sync/atomic" "sync/atomic"
@ -17,17 +17,12 @@ type stream struct {
streamId uint32 streamId uint32
closed atomic.Bool closed atomic.Bool
l logger.CtxLogger l logger.CtxLogger
queue multiqueue.MultiQueue[drpc.Message] queue *mb.MB[drpc.Message]
tags []string tags []string
} }
func (sr *stream) write(msg drpc.Message) (err error) { func (sr *stream) write(msg drpc.Message) (err error) {
var queueId string return sr.queue.Add(sr.stream.Context(), msg)
if qId, ok := msg.(MessageQueueId); ok {
queueId = qId.MessageQueueId()
msg = qId.DrpcMessage()
}
return sr.queue.Add(sr.stream.Context(), queueId, msg)
} }
func (sr *stream) readLoop() error { func (sr *stream) readLoop() error {
@ -50,13 +45,21 @@ func (sr *stream) readLoop() error {
} }
} }
func (sr *stream) writeToStream(msg drpc.Message) { func (sr *stream) writeLoop() {
if err := sr.stream.MsgSend(msg, EncodingProto); err != nil { for {
sr.l.Warn("msg send error", zap.Error(err)) msg, err := sr.queue.WaitOne(sr.peerCtx)
sr.streamClose() if err != nil {
return if err != mb.ErrClosed {
sr.streamClose()
}
return
}
if err := sr.stream.MsgSend(msg, EncodingProto); err != nil {
sr.l.Warn("msg send error", zap.Error(err))
sr.streamClose()
return
}
} }
return
} }
func (sr *stream) streamClose() { func (sr *stream) streamClose() {

View file

@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"github.com/anyproto/any-sync/net" "github.com/anyproto/any-sync/net"
"github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/util/multiqueue" "github.com/cheggaaa/mb/v3"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -74,6 +74,9 @@ func (s *streamPool) ReadStream(drpcStream drpc.Stream, tags ...string) error {
if err != nil { if err != nil {
return err return err
} }
go func() {
st.writeLoop()
}()
return st.readLoop() return st.readLoop()
} }
@ -85,6 +88,9 @@ func (s *streamPool) AddStream(drpcStream drpc.Stream, tags ...string) error {
go func() { go func() {
_ = st.readLoop() _ = st.readLoop()
}() }()
go func() {
st.writeLoop()
}()
return nil return nil
} }
@ -122,7 +128,7 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)), l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)),
tags: tags, tags: tags,
} }
st.queue = multiqueue.New[drpc.Message](st.writeToStream, s.writeQueueSize) st.queue = mb.New[drpc.Message](s.writeQueueSize)
s.streams[streamId] = st s.streams[streamId] = st
s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId) s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId)
for _, tag := range tags { for _, tag := range tags {
@ -364,21 +370,3 @@ func removeStream(m map[string][]uint32, key string, streamId uint32) {
m[key] = streamIds m[key] = streamIds
} }
} }
// WithQueueId wraps the message and adds queueId
func WithQueueId(msg drpc.Message, queueId string) drpc.Message {
return &messageWithQueueId{queueId: queueId, Message: msg}
}
type messageWithQueueId struct {
drpc.Message
queueId string
}
func (m messageWithQueueId) MessageQueueId() string {
return m.queueId
}
func (m messageWithQueueId) DrpcMessage() drpc.Message {
return m.Message
}

View file

@ -47,7 +47,7 @@ func TestStreamPool_AddStream(t *testing.T) {
require.NoError(t, fx.AddStream(s2, "space2", "common")) require.NoError(t, fx.AddStream(s2, "space2", "common"))
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1")) require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1"))
require.NoError(t, fx.Broadcast(ctx, WithQueueId(&testservice.StreamMessage{ReqData: "space2"}, "q2"), "space2")) require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space2"}, "space2"))
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "common"}, "common")) require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "common"}, "common"))
var serverResults []string var serverResults []string