1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Add queue size to stream pool

This commit is contained in:
mcrakhman 2024-08-14 18:50:33 +02:00
parent 3393ccf3f4
commit 550ba490fd
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
5 changed files with 23 additions and 22 deletions

View file

@ -166,7 +166,7 @@ func (r *RpcServer) SpacePull(ctx context.Context, request *spacesyncproto.Space
}
func (r *RpcServer) ObjectSyncStream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error {
return r.streamPool.ReadStream(&failingStream{stream, false})
return r.streamPool.ReadStream(&failingStream{stream, false}, 100)
}
func (r *RpcServer) ObjectSync(ctx context.Context, message *spacesyncproto.ObjectSyncMessage) (*spacesyncproto.ObjectSyncMessage, error) {

View file

@ -596,7 +596,7 @@ func (s *streamOpener) Name() (name string) {
return streamhandler.CName
}
func (s *streamOpener) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) {
func (s *streamOpener) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, queueSize int, err error) {
conn, err := p.AcquireDrpcConn(ctx)
if err != nil {
return
@ -618,7 +618,8 @@ func (s *streamOpener) OpenStream(ctx context.Context, p peer.Peer) (stream drpc
}); err != nil {
return
}
return &failingStream{objectStream, false}, nil, nil
queueSize = 100
return &failingStream{objectStream, false}, nil, queueSize, nil
}
//

View file

@ -15,7 +15,7 @@ const CName = "common.streampool.streamhandler"
type StreamHandler interface {
app.Component
// OpenStream opens stream with given peer
OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error)
OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, queueSize int, err error)
// HandleMessage handles incoming message
HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error)
// NewReadMessage creates new empty message for unmarshalling into it

View file

@ -49,9 +49,9 @@ type MessageQueueId interface {
type StreamPool interface {
app.ComponentRunnable
// AddStream adds new outgoing stream into the pool
AddStream(stream drpc.Stream, tags ...string) (err error)
AddStream(stream drpc.Stream, queueSize int, tags ...string) (err error)
// ReadStream adds new incoming stream and synchronously read it
ReadStream(stream drpc.Stream, tags ...string) (err error)
ReadStream(stream drpc.Stream, queueSize int, tags ...string) (err error)
// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error)
// SendById sends a message to given peerIds. Works only if stream exists
@ -147,8 +147,8 @@ type openingProcess struct {
err error
}
func (s *streamPool) ReadStream(drpcStream drpc.Stream, tags ...string) error {
st, err := s.addStream(drpcStream, tags...)
func (s *streamPool) ReadStream(drpcStream drpc.Stream, queueSize int, tags ...string) error {
st, err := s.addStream(drpcStream, queueSize, tags...)
if err != nil {
return err
}
@ -158,8 +158,8 @@ func (s *streamPool) ReadStream(drpcStream drpc.Stream, tags ...string) error {
return st.readLoop()
}
func (s *streamPool) AddStream(drpcStream drpc.Stream, tags ...string) error {
st, err := s.addStream(drpcStream, tags...)
func (s *streamPool) AddStream(drpcStream drpc.Stream, queueSize int, tags ...string) error {
st, err := s.addStream(drpcStream, queueSize, tags...)
if err != nil {
return err
}
@ -183,7 +183,7 @@ func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) {
return
}
func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, error) {
func (s *streamPool) addStream(drpcStream drpc.Stream, queueSize int, tags ...string) (*stream, error) {
ctx := drpcStream.Context()
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
@ -193,7 +193,6 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
defer s.mu.Unlock()
s.lastStreamId++
streamId := s.lastStreamId
queueSize := s.writeQueueSize
if queueSize <= 0 {
queueSize = 100
}
@ -207,7 +206,7 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
tags: tags,
stats: newStreamStat(peerId),
}
st.queue = mb.New[drpc.Message](s.writeQueueSize)
st.queue = mb.New[drpc.Message](queueSize)
s.streams[streamId] = st
s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId)
for _, tag := range tags {
@ -337,12 +336,12 @@ func (s *streamPool) openStream(ctx context.Context, p peer.Peer) *openingProces
}
ctx = peer.CtxWithProtoVersion(ctx, peerProto)
// open new stream and add to pool
st, tags, err := s.handler.OpenStream(ctx, p)
st, tags, queueSize, err := s.handler.OpenStream(ctx, p)
if err != nil {
op.err = err
return
}
if err = s.AddStream(st, tags...); err != nil {
if err = s.AddStream(st, queueSize, tags...); err != nil {
op.err = nil
return
}

View file

@ -47,9 +47,9 @@ func TestStreamPool_AddStream(t *testing.T) {
defer fx.Finish(t)
s1, _ := newClientStream(t, fx, "p1")
require.NoError(t, fx.AddStream(s1, "space1", "common"))
require.NoError(t, fx.AddStream(s1, 100, "space1", "common"))
s2, _ := newClientStream(t, fx, "p2")
require.NoError(t, fx.AddStream(s2, "space2", "common"))
require.NoError(t, fx.AddStream(s2, 100, "space2", "common"))
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1"))
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space2"}, "space2"))
@ -78,7 +78,7 @@ func TestStreamPool_AddStream(t *testing.T) {
s1, p1 := newClientStream(t, fx, "p1")
defer s1.Close()
require.NoError(t, fx.AddStream(s1, "space1", "common"))
require.NoError(t, fx.AddStream(s1, 100, "space1", "common"))
require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, func(ctx context.Context) (peers []peer.Peer, err error) {
return []peer.Peer{p1}, nil
@ -172,7 +172,7 @@ func TestStreamPool_SendById(t *testing.T) {
s1, _ := newClientStream(t, fx, "p1")
defer s1.Close()
require.NoError(t, fx.AddStream(s1, "space1", "common"))
require.NoError(t, fx.AddStream(s1, 100, "space1", "common"))
require.NoError(t, fx.SendById(ctx, &testservice.StreamMessage{ReqData: "test"}, "p1"))
var msg *testservice.StreamMessage
@ -190,11 +190,11 @@ func TestStreamPool_Tags(t *testing.T) {
s1, _ := newClientStream(t, fx, "p1")
defer s1.Close()
require.NoError(t, fx.AddStream(s1, "t1"))
require.NoError(t, fx.AddStream(s1, 100, "t1"))
s2, _ := newClientStream(t, fx, "p2")
defer s1.Close()
require.NoError(t, fx.AddStream(s2, "t2"))
require.NoError(t, fx.AddStream(s2, 100, "t2"))
err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3", "t3")
require.NoError(t, err)
@ -250,7 +250,7 @@ func (t *testHandler) Name() (name string) {
return streamhandler.CName
}
func (t *testHandler) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) {
func (t *testHandler) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, queueSize int, err error) {
if t.streamOpenDelay > 0 {
time.Sleep(t.streamOpenDelay)
}
@ -258,6 +258,7 @@ func (t *testHandler) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.
if err != nil {
return
}
queueSize = 100
stream, err = testservice.NewDRPCTestClient(conn).TestStream(p.Context())
return
}