From 15af9eec36f23e595cbbfd538948ea7638a6e55f Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 14 Jul 2022 16:59:01 +0300 Subject: [PATCH] drpc experiment --- cmd/client/client.go | 61 +++++++++++++- cmd/node/node.go | 1 + etc/config.yml | 1 + go.mod | 2 + go.sum | 4 + service/server/drpc.go | 99 ++++++++++++++++++++++ syncproto/proto/commands.proto | 2 +- syncproto/proto/service.proto | 2 +- syncproto/service_drpc.pb.go | 146 +++++++++++++++++++++++++++++++++ 9 files changed, 314 insertions(+), 4 deletions(-) create mode 100644 service/server/drpc.go create mode 100644 syncproto/service_drpc.pb.go diff --git a/cmd/client/client.go b/cmd/client/client.go index 71b0d4d2..0f9cce60 100644 --- a/cmd/client/client.go +++ b/cmd/client/client.go @@ -9,6 +9,9 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "log" + "net" + "storj.io/drpc" + "storj.io/drpc/drpcconn" "time" ) @@ -21,7 +24,11 @@ func main() { if err != nil { panic(err) } + benchGrpc(conf) + benchDrpc(conf) +} +func benchGrpc(conf *config.Config) { var opts []grpc.DialOption if conf.GrpcServer.TLS { creds, err := credentials.NewClientTLSFromFile(conf.GrpcServer.TLSCertFile, "127.0.0.1") @@ -39,12 +46,15 @@ func main() { } defer conn.Close() client := syncproto.NewAnytypeSyncClient(conn) + stream, err := client.Ping(context.TODO()) if err != nil { panic(err) } + st := time.Now() - n := 1000000 + n := 100000 + for i := 0; i < n; i++ { if err = stream.Send(&syncproto.PingRequest{ Seq: int64(i), @@ -58,5 +68,52 @@ func main() { } dur := time.Since(st) fmt.Printf("%d req for %v (%d per sec)\n", n, dur, int(float64(n)/dur.Seconds())) - +} + +func benchDrpc(conf *config.Config) { + rawconn, err := net.Dial("tcp", conf.GrpcServer.ListenAddrs[1]) + if err != nil { + panic(err) + } + conn := drpcconn.New(rawconn) + defer conn.Close() + client := syncproto.NewDRPCAnytypeSyncClient(conn) + + stream, err := client.Ping(context.TODO()) + if err != nil { + panic(err) + } + + st := time.Now() + n := 100000 + + for i := 0; i < n; i++ { + if err = stream.MsgSend(&syncproto.PingRequest{ + Seq: int64(i), + }, enc{}); err != nil { + panic(err) + } + msg := &syncproto.PingResponse{} + err := stream.MsgRecv(msg, enc{}) + if err != nil { + panic(err) + } + } + dur := time.Since(st) + fmt.Printf("%d req for %v (%d per sec)\n", n, dur, int(float64(n)/dur.Seconds())) +} + +type enc struct { +} + +func (e enc) Marshal(msg drpc.Message) ([]byte, error) { + return msg.(interface { + Marshal() ([]byte, error) + }).Marshal() +} + +func (e enc) Unmarshal(buf []byte, msg drpc.Message) error { + return msg.(interface { + Unmarshal(buf []byte) error + }).Unmarshal(buf) } diff --git a/cmd/node/node.go b/cmd/node/node.go index 161daa88..a6700e75 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -81,4 +81,5 @@ func main() { func Bootstrap(a *app.App) { a.Register(server.New()) + a.Register(server.NewDRPC()) } diff --git a/etc/config.yml b/etc/config.yml index 281c7d71..245199a3 100644 --- a/etc/config.yml +++ b/etc/config.yml @@ -4,6 +4,7 @@ anytype: grpcServer: listenAddrs: - "127.0.0.1:4430" + - "127.0.0.1:4431" tls: false tlsKeyFile: "etc/x509/key.pem" tlsCertFile: "etc/x509/cert.pem" diff --git a/go.mod b/go.mod index c0d0602f..c06a0cc1 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/sirupsen/logrus v1.6.0 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect + github.com/zeebo/errs v1.2.2 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect @@ -56,6 +57,7 @@ require ( google.golang.org/grpc v1.48.0 // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect + storj.io/drpc v0.0.32 // indirect ) replace github.com/textileio/go-threads => github.com/anytypeio/go-threads v1.1.0-rc1.0.20220223104843-a67245cee80e diff --git a/go.sum b/go.sum index 01e832d4..e9440306 100644 --- a/go.sum +++ b/go.sum @@ -1044,6 +1044,8 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1: github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g= +github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.mongodb.org/mongo-driver v1.4.0/go.mod h1:llVBH2pkj9HywK0Dtdt6lDikOjFLbceHVu/Rc0iMKLs= @@ -1398,3 +1400,5 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck= sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0= +storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI= +storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg= diff --git a/service/server/drpc.go b/service/server/drpc.go new file mode 100644 index 00000000..479392e6 --- /dev/null +++ b/service/server/drpc.go @@ -0,0 +1,99 @@ +package server + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" + "io" + "net" + "storj.io/drpc" + "storj.io/drpc/drpcmux" + "storj.io/drpc/drpcserver" + "time" +) + +const CNameDRPC = "serverDrpc" + +func NewDRPC() *ServerDrpc { + return &ServerDrpc{} +} + +type ServerDrpc struct { + config config.GrpcServer + grpcServerDrpc *drpcserver.Server + cancel func() +} + +func (s *ServerDrpc) Init(ctx context.Context, a *app.App) (err error) { + s.config = a.MustComponent(config.CName).(*config.Config).GrpcServer + return nil +} + +func (s *ServerDrpc) Name() (name string) { + return CNameDRPC +} + +func (s *ServerDrpc) Run(ctx context.Context) (err error) { + m := drpcmux.New() + lis, err := net.Listen("tcp", s.config.ListenAddrs[1]) + if err != nil { + return err + } + err = syncproto.DRPCRegisterAnytypeSync(m, s) + if err != nil { + return err + } + ctx, s.cancel = context.WithCancel(ctx) + s.grpcServerDrpc = drpcserver.New(m) + var errCh = make(chan error) + go func() { + errCh <- s.grpcServerDrpc.Serve(ctx, lis) + }() + select { + case <-time.After(time.Second / 4): + case err = <-errCh: + } + log.Sugar().Infof("drpc server started at: %v", s.config.ListenAddrs[1]) + return +} + +func (s *ServerDrpc) Ping(stream syncproto.DRPCAnytypeSync_PingStream) error { + for { + var in = &syncproto.PingRequest{} + err := stream.MsgRecv(in, enc{}) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + if err := stream.MsgSend(&syncproto.PingResponse{ + Seq: in.Seq, + }, enc{}); err != nil { + return err + } + } +} + +func (s *ServerDrpc) Close(ctx context.Context) (err error) { + if s.cancel != nil { + s.cancel() + } + return +} + +type enc struct { +} + +func (e enc) Marshal(msg drpc.Message) ([]byte, error) { + return msg.(interface { + Marshal() ([]byte, error) + }).Marshal() +} + +func (e enc) Unmarshal(buf []byte, msg drpc.Message) error { + return msg.(interface { + Unmarshal(buf []byte) error + }).Unmarshal(buf) +} diff --git a/syncproto/proto/commands.proto b/syncproto/proto/commands.proto index 2eb5bf86..73a5366c 100644 --- a/syncproto/proto/commands.proto +++ b/syncproto/proto/commands.proto @@ -1,6 +1,6 @@ syntax = "proto3"; package anytype; -option go_package = "syncproto"; +option go_package = "/syncproto"; message Ping { diff --git a/syncproto/proto/service.proto b/syncproto/proto/service.proto index 5e261219..fc0a35ff 100644 --- a/syncproto/proto/service.proto +++ b/syncproto/proto/service.proto @@ -1,6 +1,6 @@ syntax = "proto3"; package anytype; -option go_package = "syncproto"; +option go_package = "/syncproto"; import "syncproto/proto/commands.proto"; diff --git a/syncproto/service_drpc.pb.go b/syncproto/service_drpc.pb.go new file mode 100644 index 00000000..694868b4 --- /dev/null +++ b/syncproto/service_drpc.pb.go @@ -0,0 +1,146 @@ +// Code generated by protoc-gen-go-drpc. DO NOT EDIT. +// protoc-gen-go-drpc version: v0.0.32 +// source: syncproto/proto/service.proto + +package syncproto + +import ( + context "context" + errors "errors" + protojson "google.golang.org/protobuf/encoding/protojson" + proto "google.golang.org/protobuf/proto" + drpc "storj.io/drpc" + drpcerr "storj.io/drpc/drpcerr" +) + +type drpcEncoding_File_syncproto_proto_service_proto struct{} + +func (drpcEncoding_File_syncproto_proto_service_proto) Marshal(msg drpc.Message) ([]byte, error) { + return proto.Marshal(msg.(proto.Message)) +} + +func (drpcEncoding_File_syncproto_proto_service_proto) MarshalAppend(buf []byte, msg drpc.Message) ([]byte, error) { + return proto.MarshalOptions{}.MarshalAppend(buf, msg.(proto.Message)) +} + +func (drpcEncoding_File_syncproto_proto_service_proto) Unmarshal(buf []byte, msg drpc.Message) error { + return proto.Unmarshal(buf, msg.(proto.Message)) +} + +func (drpcEncoding_File_syncproto_proto_service_proto) JSONMarshal(msg drpc.Message) ([]byte, error) { + return protojson.Marshal(msg.(proto.Message)) +} + +func (drpcEncoding_File_syncproto_proto_service_proto) JSONUnmarshal(buf []byte, msg drpc.Message) error { + return protojson.Unmarshal(buf, msg.(proto.Message)) +} + +type DRPCAnytypeSyncClient interface { + DRPCConn() drpc.Conn + + Ping(ctx context.Context) (DRPCAnytypeSync_PingClient, error) +} + +type drpcAnytypeSyncClient struct { + cc drpc.Conn +} + +func NewDRPCAnytypeSyncClient(cc drpc.Conn) DRPCAnytypeSyncClient { + return &drpcAnytypeSyncClient{cc} +} + +func (c *drpcAnytypeSyncClient) DRPCConn() drpc.Conn { return c.cc } + +func (c *drpcAnytypeSyncClient) Ping(ctx context.Context) (DRPCAnytypeSync_PingClient, error) { + stream, err := c.cc.NewStream(ctx, "/anytype.AnytypeSync/Ping", drpcEncoding_File_syncproto_proto_service_proto{}) + if err != nil { + return nil, err + } + x := &drpcAnytypeSync_PingClient{stream} + return x, nil +} + +type DRPCAnytypeSync_PingClient interface { + drpc.Stream + Send(*PingRequest) error + Recv() (*PingResponse, error) +} + +type drpcAnytypeSync_PingClient struct { + drpc.Stream +} + +func (x *drpcAnytypeSync_PingClient) Send(m *PingRequest) error { + return x.MsgSend(m, drpcEncoding_File_syncproto_proto_service_proto{}) +} + +func (x *drpcAnytypeSync_PingClient) Recv() (*PingResponse, error) { + m := new(PingResponse) + if err := x.MsgRecv(m, drpcEncoding_File_syncproto_proto_service_proto{}); err != nil { + return nil, err + } + return m, nil +} + +func (x *drpcAnytypeSync_PingClient) RecvMsg(m *PingResponse) error { + return x.MsgRecv(m, drpcEncoding_File_syncproto_proto_service_proto{}) +} + +type DRPCAnytypeSyncServer interface { + Ping(DRPCAnytypeSync_PingStream) error +} + +type DRPCAnytypeSyncUnimplementedServer struct{} + +func (s *DRPCAnytypeSyncUnimplementedServer) Ping(DRPCAnytypeSync_PingStream) error { + return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) +} + +type DRPCAnytypeSyncDescription struct{} + +func (DRPCAnytypeSyncDescription) NumMethods() int { return 1 } + +func (DRPCAnytypeSyncDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { + switch n { + case 0: + return "/anytype.AnytypeSync/Ping", drpcEncoding_File_syncproto_proto_service_proto{}, + func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { + return nil, srv.(DRPCAnytypeSyncServer). + Ping( + &drpcAnytypeSync_PingStream{in1.(drpc.Stream)}, + ) + }, DRPCAnytypeSyncServer.Ping, true + default: + return "", nil, nil, nil, false + } +} + +func DRPCRegisterAnytypeSync(mux drpc.Mux, impl DRPCAnytypeSyncServer) error { + return mux.Register(impl, DRPCAnytypeSyncDescription{}) +} + +type DRPCAnytypeSync_PingStream interface { + drpc.Stream + Send(*PingResponse) error + Recv() (*PingRequest, error) +} + +type drpcAnytypeSync_PingStream struct { + drpc.Stream +} + +func (x *drpcAnytypeSync_PingStream) Send(m *PingResponse) error { + return x.MsgSend(m, drpcEncoding_File_syncproto_proto_service_proto{}) +} + +func (x *drpcAnytypeSync_PingStream) Recv() (*PingRequest, error) { + m := new(PingRequest) + if err := x.MsgRecv(m, drpcEncoding_File_syncproto_proto_service_proto{}); err != nil { + return nil, err + } + return m, nil +} + +func (x *drpcAnytypeSync_PingStream) RecvMsg(m *PingRequest) error { + return x.MsgRecv(m, drpcEncoding_File_syncproto_proto_service_proto{}) +}