diff --git a/Makefile b/Makefile index d2229111..5484c4c1 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,7 @@ proto: $(eval PKGMAP := $$(P_TREE_CHANGES),$$(P_ACL_RECORDS)) protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. commonspace/spacesyncproto/protos/*.proto protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. commonfile/fileproto/protos/*.proto + protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. net/streampool/testservice/protos/*.proto deps: go mod download diff --git a/go.mod b/go.mod index c0bb5cab..ea57e167 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/anytypeio/any-sync go 1.19 require ( - github.com/anytypeio/go-chash v0.0.0-20220629194632-4ad1154fe232 + github.com/anytypeio/go-chash v0.0.2 github.com/awalterschulze/gographviz v2.0.3+incompatible github.com/cespare/xxhash v1.1.0 github.com/cheggaaa/mb/v3 v3.0.0 @@ -20,17 +20,18 @@ require ( github.com/ipfs/go-ipld-format v0.4.0 github.com/ipfs/go-merkledag v0.8.1 github.com/ipfs/go-unixfs v0.4.1 - github.com/libp2p/go-libp2p v0.23.2 + github.com/libp2p/go-libp2p v0.24.1 github.com/minio/sha256-simd v1.0.0 github.com/multiformats/go-multibase v0.1.1 github.com/multiformats/go-multihash v0.2.1 - github.com/prometheus/client_golang v1.13.0 + github.com/prometheus/client_golang v1.14.0 github.com/stretchr/testify v1.8.1 github.com/zeebo/blake3 v0.2.3 github.com/zeebo/errs v1.3.0 go.uber.org/atomic v1.10.0 go.uber.org/zap v1.24.0 - golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b + golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 + golang.org/x/net v0.3.0 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 gopkg.in/yaml.v3 v3.0.1 storj.io/drpc v0.0.32 @@ -39,8 +40,9 @@ require ( require ( github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/fogleman/gg v1.3.0 // indirect github.com/go-logr/logr v1.2.3 // indirect @@ -64,24 +66,26 @@ require ( github.com/ipfs/go-verifcid v0.0.2 // indirect github.com/ipld/go-codec-dagpb v1.5.0 // indirect github.com/ipld/go-ipld-prime v0.19.0 // indirect + github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jbenet/goprocess v0.1.4 // indirect github.com/klauspost/cpuid/v2 v2.2.2 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-openssl v0.1.0 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-pointer v0.0.1 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect - github.com/multiformats/go-multiaddr v0.7.0 // indirect - github.com/multiformats/go-multicodec v0.6.0 // indirect + github.com/multiformats/go-multiaddr v0.8.0 // indirect + github.com/multiformats/go-multicodec v0.7.0 // indirect + github.com/multiformats/go-multistream v0.3.3 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.89.0 // indirect - github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect @@ -93,7 +97,7 @@ require ( go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.4.0 // indirect golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect - golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.3.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/go.sum b/go.sum index cfd087e3..f258a6b2 100644 --- a/go.sum +++ b/go.sum @@ -564,6 +564,7 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.3.0 h1:VWL6FNY2bEEmsGVKabSlHu5Irp34xmMRoqb/9lF9lxk= +golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/net/streampool/encoding.go b/net/streampool/encoding.go new file mode 100644 index 00000000..d724bf90 --- /dev/null +++ b/net/streampool/encoding.go @@ -0,0 +1,34 @@ +package streampool + +import ( + "errors" + "github.com/gogo/protobuf/proto" + "storj.io/drpc" +) + +var ( + // EncodingProto drpc.Encoding implementation for gogo protobuf + EncodingProto drpc.Encoding = protoEncoding{} +) + +var ( + errNotAProtoMsg = errors.New("encoding: not a proto message") +) + +type protoEncoding struct{} + +func (p protoEncoding) Marshal(msg drpc.Message) ([]byte, error) { + pmsg, ok := msg.(proto.Message) + if !ok { + return nil, errNotAProtoMsg + } + return proto.Marshal(pmsg) +} + +func (p protoEncoding) Unmarshal(buf []byte, msg drpc.Message) error { + pmsg, ok := msg.(proto.Message) + if !ok { + return errNotAProtoMsg + } + return proto.Unmarshal(buf, pmsg) +} diff --git a/net/streampool/encoding_test.go b/net/streampool/encoding_test.go new file mode 100644 index 00000000..a61d0219 --- /dev/null +++ b/net/streampool/encoding_test.go @@ -0,0 +1,24 @@ +package streampool + +import ( + "github.com/anytypeio/any-sync/net/streampool/testservice" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +func TestProtoEncoding(t *testing.T) { + t.Run("not a proto err", func(t *testing.T) { + _, err := EncodingProto.Marshal("string") + assert.Error(t, err) + err = EncodingProto.Unmarshal(nil, "sss") + assert.Error(t, err) + }) + t.Run("encode", func(t *testing.T) { + data, err := EncodingProto.Marshal(&testservice.StreamMessage{ReqData: "1"}) + require.NoError(t, err) + msg := &testservice.StreamMessage{} + require.NoError(t, EncodingProto.Unmarshal(data, msg)) + assert.Equal(t, "1", msg.ReqData) + }) +} diff --git a/net/streampool/sendpool.go b/net/streampool/sendpool.go new file mode 100644 index 00000000..899da5c5 --- /dev/null +++ b/net/streampool/sendpool.go @@ -0,0 +1,44 @@ +package streampool + +import ( + "context" + "github.com/cheggaaa/mb/v3" + "go.uber.org/zap" +) + +// newStreamSender creates new sendPool +// workers - how many processes will execute tasks +// maxSize - limit for queue size +func newStreamSender(workers, maxSize int) *sendPool { + ss := &sendPool{ + batch: mb.New[func()](maxSize), + } + for i := 0; i < workers; i++ { + go ss.sendLoop() + } + return ss +} + +// sendPool needed for parallel execution of the incoming send tasks +type sendPool struct { + batch *mb.MB[func()] +} + +func (ss *sendPool) Add(ctx context.Context, f ...func()) (err error) { + return ss.batch.Add(ctx, f...) +} + +func (ss *sendPool) sendLoop() { + for { + f, err := ss.batch.WaitOne(context.Background()) + if err != nil { + log.Debug("close send loop", zap.Error(err)) + return + } + f() + } +} + +func (ss *sendPool) Close() (err error) { + return ss.batch.Close() +} diff --git a/net/streampool/stream.go b/net/streampool/stream.go new file mode 100644 index 00000000..4b129949 --- /dev/null +++ b/net/streampool/stream.go @@ -0,0 +1,45 @@ +package streampool + +import ( + "go.uber.org/zap" + "storj.io/drpc" + "sync/atomic" +) + +type stream struct { + peerId string + stream drpc.Stream + pool *streamPool + streamId uint32 + closed atomic.Bool + l *zap.Logger + tags []string +} + +func (sr *stream) write(msg drpc.Message) (err error) { + if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { + sr.l.Info("stream write error", zap.Error(err)) + sr.streamClose() + } + return err +} + +func (sr *stream) readLoop() { + defer func() { + sr.streamClose() + }() + for { + msg := sr.pool.handler.NewReadMessage() + if err := sr.stream.MsgRecv(msg, EncodingProto); err != nil { + sr.l.Info("msg receive error", zap.Error(err)) + return + } + } +} + +func (sr *stream) streamClose() { + if !sr.closed.Swap(true) { + _ = sr.stream.Close() + sr.pool.removeStream(sr.streamId) + } +} diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go new file mode 100644 index 00000000..572c29c3 --- /dev/null +++ b/net/streampool/streampool.go @@ -0,0 +1,202 @@ +package streampool + +import ( + "github.com/anytypeio/any-sync/net/peer" + "go.uber.org/zap" + "golang.org/x/exp/slices" + "golang.org/x/net/context" + "storj.io/drpc" + "sync" +) + +// StreamHandler handles incoming messages from streams +type StreamHandler interface { + // OpenStream opens stream with given peer + OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) + // HandleMessage handles incoming message + HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error) + // NewReadMessage creates new empty message for unmarshalling into it + NewReadMessage() drpc.Message +} + +// StreamPool keeps and read streams +type StreamPool interface { + // AddStream adds new incoming stream into the pool + AddStream(peerId string, stream drpc.Stream, tags ...string) + // Send sends a message to given peers. A stream will be opened if it is not cached before. Works async. + Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) + // Broadcast sends a message to all peers with given tags. Works async. + Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error) + // Close closes all streams + Close() error +} + +type streamPool struct { + handler StreamHandler + streamIdsByPeer map[string][]uint32 + streamIdsByTag map[string][]uint32 + streams map[uint32]*stream + opening map[string]chan struct{} + exec *sendPool + mu sync.RWMutex + lastStreamId uint32 +} + +func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...string) { + s.mu.Lock() + defer s.mu.Unlock() + s.lastStreamId++ + streamId := s.lastStreamId + st := &stream{ + peerId: peerId, + stream: drpcStream, + pool: s, + streamId: streamId, + l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)), + tags: tags, + } + s.streams[streamId] = st + s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId) + for _, tag := range tags { + s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) + } + go st.readLoop() +} + +func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) { + var funcs []func() + for _, p := range peers { + funcs = append(funcs, func() { + if e := s.sendOne(ctx, p, msg); e != nil { + log.Info("send peer error", zap.Error(e)) + } + }) + } + return s.exec.Add(ctx, funcs...) +} + +func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) { + // get all streams relates to peer + streams, err := s.getStreams(ctx, p) + if err != nil { + return + } + for _, st := range streams { + if err = st.write(msg); err != nil { + log.Info("stream write error", zap.Error(err)) + // continue with next stream + continue + } else { + // stop sending on success + break + } + } + return +} + +func (s *streamPool) getStreams(ctx context.Context, p peer.Peer) (streams []*stream, err error) { + s.mu.Lock() + // check cached streams + streamIds := s.streamIdsByPeer[p.Id()] + for _, streamId := range streamIds { + streams = append(streams, s.streams[streamId]) + } + var openingCh chan struct{} + // no cached streams found + if len(streams) == 0 { + // start opening process + openingCh = s.openStream(ctx, p) + } + s.mu.Unlock() + + // not empty openingCh means we should wait for the stream opening and try again + if openingCh != nil { + select { + case <-openingCh: + return s.getStreams(ctx, p) + case <-ctx.Done(): + return nil, ctx.Err() + } + } + return streams, nil +} + +func (s *streamPool) openStream(ctx context.Context, p peer.Peer) chan struct{} { + if ch, ok := s.opening[p.Id()]; ok { + // already have an opening process for this stream - return channel + return ch + } + ch := make(chan struct{}) + s.opening[p.Id()] = ch + go func() { + // start stream opening in separate goroutine to avoid lock whole pool + defer func() { + s.mu.Lock() + defer s.mu.Unlock() + close(ch) + delete(s.opening, p.Id()) + }() + // open new stream and add to pool + st, tags, err := s.handler.OpenStream(ctx, p) + if err != nil { + log.Warn("stream open error", zap.Error(err)) + return + } + s.AddStream(p.Id(), st, tags...) + }() + return ch +} + +func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error) { + s.mu.Lock() + var streams []*stream + for _, tag := range tags { + for _, streamId := range s.streamIdsByTag[tag] { + streams = append(streams, s.streams[streamId]) + } + } + s.mu.Unlock() + var funcs []func() + for _, st := range streams { + funcs = append(funcs, func() { + if e := st.write(msg); e != nil { + log.Debug("broadcast write error", zap.Error(e)) + } + }) + } + return s.exec.Add(ctx, funcs...) +} + +func (s *streamPool) removeStream(streamId uint32) { + s.mu.Lock() + defer s.mu.Unlock() + st := s.streams[streamId] + if st == nil { + log.Fatal("removeStream: stream does not exist", zap.Uint32("streamId", streamId)) + } + + var removeStream = func(m map[string][]uint32, key string) { + streamIds := m[key] + idx := slices.Index(streamIds, streamId) + if idx == -1 { + log.Fatal("removeStream: streamId does not exist", zap.Uint32("streamId", streamId)) + } + streamIds = slices.Delete(streamIds, idx, idx+1) + if len(streamIds) == 0 { + delete(m, key) + } else { + m[key] = streamIds + } + } + + removeStream(s.streamIdsByPeer, st.peerId) + for _, tag := range st.tags { + removeStream(s.streamIdsByTag, tag) + } + + delete(s.streams, streamId) +} + +func (s *streamPool) Close() (err error) { + return s.exec.Close() +} diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go new file mode 100644 index 00000000..7722173e --- /dev/null +++ b/net/streampool/streampool_test.go @@ -0,0 +1,199 @@ +package streampool + +import ( + "fmt" + "github.com/anytypeio/any-sync/net/peer" + "github.com/anytypeio/any-sync/net/rpc/rpctest" + "github.com/anytypeio/any-sync/net/streampool/testservice" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" + "sort" + "storj.io/drpc" + "sync" + "sync/atomic" + "testing" + "time" +) + +var ctx = context.Background() + +func TestStreamPool_AddStream(t *testing.T) { + newClientStream := func(fx *fixture, peerId string) (st testservice.DRPCTest_TestStreamClient, p peer.Peer) { + p, err := fx.tp.Dial(ctx, peerId) + require.NoError(t, err) + s, err := testservice.NewDRPCTestClient(p).TestStream(ctx) + require.NoError(t, err) + return s, p + } + + t.Run("broadcast incoming", func(t *testing.T) { + fx := newFixture(t) + defer fx.Finish(t) + + s1, _ := newClientStream(fx, "p1") + fx.AddStream("p1", s1, "space1", "common") + s2, _ := newClientStream(fx, "p2") + fx.AddStream("p2", s2, "space2", "common") + + require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1")) + require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space2"}, "space2")) + require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "common"}, "common")) + + var serverResults []string + for i := 0; i < 4; i++ { + select { + case msg := <-fx.tsh.receiveCh: + serverResults = append(serverResults, msg.ReqData) + case <-time.After(time.Second): + require.NoError(t, fmt.Errorf("timeout")) + } + } + + sort.Strings(serverResults) + assert.Equal(t, []string{"common", "common", "space1", "space2"}, serverResults) + + assert.NoError(t, s1.Close()) + assert.NoError(t, s2.Close()) + }) + + t.Run("send incoming", func(t *testing.T) { + fx := newFixture(t) + defer fx.Finish(t) + + s1, p1 := newClientStream(fx, "p1") + defer s1.Close() + fx.AddStream("p1", s1, "space1", "common") + + require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, p1)) + var msg *testservice.StreamMessage + select { + case msg = <-fx.tsh.receiveCh: + case <-time.After(time.Second): + require.NoError(t, fmt.Errorf("timeout")) + } + assert.Equal(t, "test", msg.ReqData) + }) +} + +func TestStreamPool_Send(t *testing.T) { + t.Run("open stream", func(t *testing.T) { + fx := newFixture(t) + defer fx.Finish(t) + + p, err := fx.tp.Dial(ctx, "p1") + require.NoError(t, err) + + require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, p)) + + var msg *testservice.StreamMessage + select { + case msg = <-fx.tsh.receiveCh: + case <-time.After(time.Second): + require.NoError(t, fmt.Errorf("timeout")) + } + assert.Equal(t, "should open stream", msg.ReqData) + }) + t.Run("parallel open stream", func(t *testing.T) { + fx := newFixture(t) + defer fx.Finish(t) + + p, err := fx.tp.Dial(ctx, "p1") + require.NoError(t, err) + + fx.th.streamOpenDelay = time.Second / 3 + + var numMsgs = 5 + + for i := 0; i < numMsgs; i++ { + go require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, p)) + } + + var msgs []*testservice.StreamMessage + for i := 0; i < numMsgs; i++ { + select { + case msg := <-fx.tsh.receiveCh: + msgs = append(msgs, msg) + case <-time.After(time.Second): + require.NoError(t, fmt.Errorf("timeout")) + } + } + assert.Len(t, msgs, numMsgs) + // make sure that we have only one stream + assert.Equal(t, int32(1), fx.tsh.streamsCount.Load()) + }) +} + +func newFixture(t *testing.T) *fixture { + fx := &fixture{} + ts := rpctest.NewTestServer() + fx.tsh = &testServerHandler{receiveCh: make(chan *testservice.StreamMessage, 100)} + require.NoError(t, testservice.DRPCRegisterTest(ts, fx.tsh)) + fx.tp = rpctest.NewTestPool().WithServer(ts) + fx.th = &testHandler{} + fx.StreamPool = New().NewStreamPool(fx.th) + return fx +} + +type fixture struct { + StreamPool + tp *rpctest.TestPool + th *testHandler + tsh *testServerHandler +} + +func (fx *fixture) Finish(t *testing.T) { + require.NoError(t, fx.Close()) + require.NoError(t, fx.tp.Close(ctx)) +} + +type testHandler struct { + streamOpenDelay time.Duration + incomingMessages []drpc.Message + mu sync.Mutex +} + +func (t *testHandler) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) { + if t.streamOpenDelay > 0 { + time.Sleep(t.streamOpenDelay) + } + stream, err = testservice.NewDRPCTestClient(p).TestStream(ctx) + return +} + +func (t *testHandler) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error) { + t.mu.Lock() + defer t.mu.Unlock() + t.incomingMessages = append(t.incomingMessages, msg) + return nil +} + +func (t *testHandler) DRPCEncoding() drpc.Encoding { + return EncodingProto +} + +func (t *testHandler) NewReadMessage() drpc.Message { + return new(testservice.StreamMessage) +} + +type testServerHandler struct { + receiveCh chan *testservice.StreamMessage + streamsCount atomic.Int32 + mu sync.Mutex +} + +func (t *testServerHandler) TestStream(st testservice.DRPCTest_TestStreamStream) error { + t.streamsCount.Add(1) + defer t.streamsCount.Add(-1) + for { + msg, err := st.Recv() + if err != nil { + return err + } + t.receiveCh <- msg + if err = st.Send(msg); err != nil { + return err + } + } + return nil +} diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go new file mode 100644 index 00000000..198e5687 --- /dev/null +++ b/net/streampool/streampoolservice.go @@ -0,0 +1,42 @@ +package streampool + +import ( + "github.com/anytypeio/any-sync/app" + "github.com/anytypeio/any-sync/app/logger" +) + +const CName = "common.net.streampool" + +var log = logger.NewNamed(CName) + +func New() Service { + return new(service) +} + +type Service interface { + NewStreamPool(h StreamHandler) StreamPool + app.Component +} + +type service struct { +} + +func (s *service) NewStreamPool(h StreamHandler) StreamPool { + return &streamPool{ + handler: h, + streamIdsByPeer: map[string][]uint32{}, + streamIdsByTag: map[string][]uint32{}, + streams: map[uint32]*stream{}, + opening: map[string]chan struct{}{}, + exec: newStreamSender(10, 100), + lastStreamId: 0, + } +} + +func (s *service) Init(a *app.App) (err error) { + return nil +} + +func (s *service) Name() (name string) { + return CName +} diff --git a/net/streampool/testservice/protos/testservice.proto b/net/streampool/testservice/protos/testservice.proto new file mode 100644 index 00000000..36d046ff --- /dev/null +++ b/net/streampool/testservice/protos/testservice.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; +package testService; + +option go_package = "net/streampool/testservice"; + +service Test { + rpc TestStream(stream StreamMessage) returns (stream StreamMessage); +} + + +message StreamMessage { + string reqData = 1; +} diff --git a/net/streampool/testservice/testservice.pb.go b/net/streampool/testservice/testservice.pb.go new file mode 100644 index 00000000..498700f3 --- /dev/null +++ b/net/streampool/testservice/testservice.pb.go @@ -0,0 +1,317 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: net/streampool/testservice/protos/testservice.proto + +package testservice + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type StreamMessage struct { + ReqData string `protobuf:"bytes,1,opt,name=reqData,proto3" json:"reqData,omitempty"` +} + +func (m *StreamMessage) Reset() { *m = StreamMessage{} } +func (m *StreamMessage) String() string { return proto.CompactTextString(m) } +func (*StreamMessage) ProtoMessage() {} +func (*StreamMessage) Descriptor() ([]byte, []int) { + return fileDescriptor_1c28d5a3a78be18f, []int{0} +} +func (m *StreamMessage) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StreamMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StreamMessage.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StreamMessage) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamMessage.Merge(m, src) +} +func (m *StreamMessage) XXX_Size() int { + return m.Size() +} +func (m *StreamMessage) XXX_DiscardUnknown() { + xxx_messageInfo_StreamMessage.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamMessage proto.InternalMessageInfo + +func (m *StreamMessage) GetReqData() string { + if m != nil { + return m.ReqData + } + return "" +} + +func init() { + proto.RegisterType((*StreamMessage)(nil), "testService.StreamMessage") +} + +func init() { + proto.RegisterFile("net/streampool/testservice/protos/testservice.proto", fileDescriptor_1c28d5a3a78be18f) +} + +var fileDescriptor_1c28d5a3a78be18f = []byte{ + // 173 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0xce, 0x4b, 0x2d, 0xd1, + 0x2f, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0x2d, 0xc8, 0xcf, 0xcf, 0xd1, 0x2f, 0x49, 0x2d, 0x2e, 0x29, + 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x2f, 0x46, 0x16, 0xd2, + 0x03, 0x0b, 0x09, 0x71, 0x83, 0x84, 0x82, 0x21, 0x42, 0x4a, 0x9a, 0x5c, 0xbc, 0xc1, 0x60, 0xfd, + 0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x42, 0x12, 0x5c, 0xec, 0x45, 0xa9, 0x85, 0x2e, 0x89, + 0x25, 0x89, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x30, 0xae, 0x51, 0x00, 0x17, 0x4b, 0x48, + 0x6a, 0x71, 0x89, 0x90, 0x07, 0x17, 0x17, 0x88, 0x86, 0x68, 0x13, 0x92, 0xd2, 0x43, 0x32, 0x4e, + 0x0f, 0xc5, 0x2c, 0x29, 0x3c, 0x72, 0x1a, 0x8c, 0x06, 0x8c, 0x4e, 0x26, 0x27, 0x1e, 0xc9, 0x31, + 0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, + 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0x25, 0x85, 0xdb, 0x63, 0x49, 0x6c, 0x60, 0x6f, 0x18, 0x03, + 0x02, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x59, 0x8d, 0x93, 0xfd, 0x00, 0x00, 0x00, +} + +func (m *StreamMessage) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StreamMessage) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StreamMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.ReqData) > 0 { + i -= len(m.ReqData) + copy(dAtA[i:], m.ReqData) + i = encodeVarintTestservice(dAtA, i, uint64(len(m.ReqData))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintTestservice(dAtA []byte, offset int, v uint64) int { + offset -= sovTestservice(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *StreamMessage) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ReqData) + if l > 0 { + n += 1 + l + sovTestservice(uint64(l)) + } + return n +} + +func sovTestservice(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozTestservice(x uint64) (n int) { + return sovTestservice(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *StreamMessage) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTestservice + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamMessage: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamMessage: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ReqData", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTestservice + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTestservice + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthTestservice + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ReqData = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTestservice(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTestservice + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipTestservice(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTestservice + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTestservice + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTestservice + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthTestservice + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupTestservice + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthTestservice + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthTestservice = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowTestservice = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupTestservice = fmt.Errorf("proto: unexpected end of group") +) diff --git a/net/streampool/testservice/testservice_drpc.pb.go b/net/streampool/testservice/testservice_drpc.pb.go new file mode 100644 index 00000000..f50fdbe7 --- /dev/null +++ b/net/streampool/testservice/testservice_drpc.pb.go @@ -0,0 +1,148 @@ +// Code generated by protoc-gen-go-drpc. DO NOT EDIT. +// protoc-gen-go-drpc version: v0.0.32 +// source: net/streampool/testservice/protos/testservice.proto + +package testservice + +import ( + bytes "bytes" + context "context" + errors "errors" + jsonpb "github.com/gogo/protobuf/jsonpb" + proto "github.com/gogo/protobuf/proto" + drpc "storj.io/drpc" + drpcerr "storj.io/drpc/drpcerr" +) + +type drpcEncoding_File_net_streampool_testservice_protos_testservice_proto struct{} + +func (drpcEncoding_File_net_streampool_testservice_protos_testservice_proto) Marshal(msg drpc.Message) ([]byte, error) { + return proto.Marshal(msg.(proto.Message)) +} + +func (drpcEncoding_File_net_streampool_testservice_protos_testservice_proto) Unmarshal(buf []byte, msg drpc.Message) error { + return proto.Unmarshal(buf, msg.(proto.Message)) +} + +func (drpcEncoding_File_net_streampool_testservice_protos_testservice_proto) JSONMarshal(msg drpc.Message) ([]byte, error) { + var buf bytes.Buffer + err := new(jsonpb.Marshaler).Marshal(&buf, msg.(proto.Message)) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (drpcEncoding_File_net_streampool_testservice_protos_testservice_proto) JSONUnmarshal(buf []byte, msg drpc.Message) error { + return jsonpb.Unmarshal(bytes.NewReader(buf), msg.(proto.Message)) +} + +type DRPCTestClient interface { + DRPCConn() drpc.Conn + + TestStream(ctx context.Context) (DRPCTest_TestStreamClient, error) +} + +type drpcTestClient struct { + cc drpc.Conn +} + +func NewDRPCTestClient(cc drpc.Conn) DRPCTestClient { + return &drpcTestClient{cc} +} + +func (c *drpcTestClient) DRPCConn() drpc.Conn { return c.cc } + +func (c *drpcTestClient) TestStream(ctx context.Context) (DRPCTest_TestStreamClient, error) { + stream, err := c.cc.NewStream(ctx, "/testService.Test/TestStream", drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{}) + if err != nil { + return nil, err + } + x := &drpcTest_TestStreamClient{stream} + return x, nil +} + +type DRPCTest_TestStreamClient interface { + drpc.Stream + Send(*StreamMessage) error + Recv() (*StreamMessage, error) +} + +type drpcTest_TestStreamClient struct { + drpc.Stream +} + +func (x *drpcTest_TestStreamClient) Send(m *StreamMessage) error { + return x.MsgSend(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{}) +} + +func (x *drpcTest_TestStreamClient) Recv() (*StreamMessage, error) { + m := new(StreamMessage) + if err := x.MsgRecv(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{}); err != nil { + return nil, err + } + return m, nil +} + +func (x *drpcTest_TestStreamClient) RecvMsg(m *StreamMessage) error { + return x.MsgRecv(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{}) +} + +type DRPCTestServer interface { + TestStream(DRPCTest_TestStreamStream) error +} + +type DRPCTestUnimplementedServer struct{} + +func (s *DRPCTestUnimplementedServer) TestStream(DRPCTest_TestStreamStream) error { + return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) +} + +type DRPCTestDescription struct{} + +func (DRPCTestDescription) NumMethods() int { return 1 } + +func (DRPCTestDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { + switch n { + case 0: + return "/testService.Test/TestStream", drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{}, + func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { + return nil, srv.(DRPCTestServer). + TestStream( + &drpcTest_TestStreamStream{in1.(drpc.Stream)}, + ) + }, DRPCTestServer.TestStream, true + default: + return "", nil, nil, nil, false + } +} + +func DRPCRegisterTest(mux drpc.Mux, impl DRPCTestServer) error { + return mux.Register(impl, DRPCTestDescription{}) +} + +type DRPCTest_TestStreamStream interface { + drpc.Stream + Send(*StreamMessage) error + Recv() (*StreamMessage, error) +} + +type drpcTest_TestStreamStream struct { + drpc.Stream +} + +func (x *drpcTest_TestStreamStream) Send(m *StreamMessage) error { + return x.MsgSend(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{}) +} + +func (x *drpcTest_TestStreamStream) Recv() (*StreamMessage, error) { + m := new(StreamMessage) + if err := x.MsgRecv(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{}); err != nil { + return nil, err + } + return m, nil +} + +func (x *drpcTest_TestStreamStream) RecvMsg(m *StreamMessage) error { + return x.MsgRecv(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{}) +}