1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Merge branch 'main' into release-fixes

# Conflicts:
#	go.mod
#	go.sum
This commit is contained in:
mcrakhman 2023-08-16 19:14:04 +02:00
commit 1946f1527a
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
16 changed files with 2775 additions and 22 deletions

View file

@ -25,6 +25,8 @@ proto:
protoc --gogofaster_out=:. net/secureservice/handshake/handshakeproto/protos/*.proto
protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. coordinator/coordinatorproto/protos/*.proto
protoc --gogofaster_out=:. --go-drpc_out=protolib=github.com/gogo/protobuf:. consensus/consensusproto/protos/*.proto
protoc --gogofaster_out=:. --go-drpc_out=protolib=github.com/gogo/protobuf:. identityrepo/identityrepoproto/protos/*.proto
deps:
go mod download

7
go.mod
View file

@ -24,6 +24,7 @@ require (
github.com/multiformats/go-multibase v0.2.0
github.com/multiformats/go-multihash v0.2.3
github.com/prometheus/client_golang v1.16.0
github.com/quic-go/quic-go v0.36.4
github.com/stretchr/testify v1.8.4
github.com/tyler-smith/go-bip39 v1.1.0
github.com/zeebo/blake3 v0.2.3
@ -48,7 +49,9 @@ require (
github.com/fogleman/gg v1.3.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20230811205829-9131a7e9cc17 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
@ -74,12 +77,15 @@ require (
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/quic-go/qtls-go1-19 v0.3.3 // indirect
github.com/quic-go/qtls-go1-20 v0.2.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
github.com/zeebo/errs v1.3.0 // indirect
@ -87,6 +93,7 @@ require (
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/image v0.6.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/tools v0.12.0 // indirect

11
go.sum
View file

@ -45,6 +45,7 @@ github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
@ -55,6 +56,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
@ -174,8 +176,9 @@ github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
github.com/onsi/gomega v1.27.8 h1:gegWiwZjBsf2DgiSbf5hpokZ98JVDMcWkUiigk6/KXc=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -193,8 +196,11 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qtls-go1-19 v0.3.3 h1:wznEHvJwd+2X3PqftRha0SUKmGsnb6dfArMhy9PeJVE=
github.com/quic-go/qtls-go1-19 v0.3.3/go.mod h1:ySOI96ew8lnoKPtSqx2BlI5wCpUVPT05RMAlajtnyOI=
github.com/quic-go/qtls-go1-20 v0.2.3 h1:m575dovXn1y2ATOb1XrRFcrv0F+EQmlowTkoraNkDPI=
github.com/quic-go/qtls-go1-20 v0.2.3/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM=
github.com/quic-go/quic-go v0.36.4 h1:CXn/ZLN5Vntlk53fjR+kUMC8Jt7flfQe+I5Ty5A+k0o=
github.com/quic-go/quic-go v0.36.4/go.mod h1:qxQumdeKw5GmWs1OsTZZnOxzSI+RJWuhf1O8FN35L2o=
github.com/quic-go/webtransport-go v0.5.3 h1:5XMlzemqB4qmOlgIus5zB45AcZ2kCgCy2EptUrfOPWU=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@ -209,6 +215,7 @@ github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
@ -268,6 +275,7 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -318,6 +326,7 @@ golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,193 @@
// Code generated by protoc-gen-go-drpc. DO NOT EDIT.
// protoc-gen-go-drpc version: v0.0.33
// source: identityrepo/identityrepoproto/protos/identityrepo.proto
package identityrepoproto
import (
bytes "bytes"
context "context"
errors "errors"
jsonpb "github.com/gogo/protobuf/jsonpb"
proto "github.com/gogo/protobuf/proto"
drpc "storj.io/drpc"
drpcerr "storj.io/drpc/drpcerr"
)
type drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto struct{}
func (drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto) Marshal(msg drpc.Message) ([]byte, error) {
return proto.Marshal(msg.(proto.Message))
}
func (drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto) Unmarshal(buf []byte, msg drpc.Message) error {
return proto.Unmarshal(buf, msg.(proto.Message))
}
func (drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto) JSONMarshal(msg drpc.Message) ([]byte, error) {
var buf bytes.Buffer
err := new(jsonpb.Marshaler).Marshal(&buf, msg.(proto.Message))
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto) JSONUnmarshal(buf []byte, msg drpc.Message) error {
return jsonpb.Unmarshal(bytes.NewReader(buf), msg.(proto.Message))
}
type DRPCIdentityRepoClient interface {
DRPCConn() drpc.Conn
DataPut(ctx context.Context, in *DataPutRequest) (*Ok, error)
DataDelete(ctx context.Context, in *DataDeleteRequest) (*Ok, error)
DataPull(ctx context.Context, in *DataPullRequest) (*DataPullResponse, error)
}
type drpcIdentityRepoClient struct {
cc drpc.Conn
}
func NewDRPCIdentityRepoClient(cc drpc.Conn) DRPCIdentityRepoClient {
return &drpcIdentityRepoClient{cc}
}
func (c *drpcIdentityRepoClient) DRPCConn() drpc.Conn { return c.cc }
func (c *drpcIdentityRepoClient) DataPut(ctx context.Context, in *DataPutRequest) (*Ok, error) {
out := new(Ok)
err := c.cc.Invoke(ctx, "/identityRepo.IdentityRepo/DataPut", drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcIdentityRepoClient) DataDelete(ctx context.Context, in *DataDeleteRequest) (*Ok, error) {
out := new(Ok)
err := c.cc.Invoke(ctx, "/identityRepo.IdentityRepo/DataDelete", drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcIdentityRepoClient) DataPull(ctx context.Context, in *DataPullRequest) (*DataPullResponse, error) {
out := new(DataPullResponse)
err := c.cc.Invoke(ctx, "/identityRepo.IdentityRepo/DataPull", drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
type DRPCIdentityRepoServer interface {
DataPut(context.Context, *DataPutRequest) (*Ok, error)
DataDelete(context.Context, *DataDeleteRequest) (*Ok, error)
DataPull(context.Context, *DataPullRequest) (*DataPullResponse, error)
}
type DRPCIdentityRepoUnimplementedServer struct{}
func (s *DRPCIdentityRepoUnimplementedServer) DataPut(context.Context, *DataPutRequest) (*Ok, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCIdentityRepoUnimplementedServer) DataDelete(context.Context, *DataDeleteRequest) (*Ok, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCIdentityRepoUnimplementedServer) DataPull(context.Context, *DataPullRequest) (*DataPullResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
type DRPCIdentityRepoDescription struct{}
func (DRPCIdentityRepoDescription) NumMethods() int { return 3 }
func (DRPCIdentityRepoDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
switch n {
case 0:
return "/identityRepo.IdentityRepo/DataPut", drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCIdentityRepoServer).
DataPut(
ctx,
in1.(*DataPutRequest),
)
}, DRPCIdentityRepoServer.DataPut, true
case 1:
return "/identityRepo.IdentityRepo/DataDelete", drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCIdentityRepoServer).
DataDelete(
ctx,
in1.(*DataDeleteRequest),
)
}, DRPCIdentityRepoServer.DataDelete, true
case 2:
return "/identityRepo.IdentityRepo/DataPull", drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCIdentityRepoServer).
DataPull(
ctx,
in1.(*DataPullRequest),
)
}, DRPCIdentityRepoServer.DataPull, true
default:
return "", nil, nil, nil, false
}
}
func DRPCRegisterIdentityRepo(mux drpc.Mux, impl DRPCIdentityRepoServer) error {
return mux.Register(impl, DRPCIdentityRepoDescription{})
}
type DRPCIdentityRepo_DataPutStream interface {
drpc.Stream
SendAndClose(*Ok) error
}
type drpcIdentityRepo_DataPutStream struct {
drpc.Stream
}
func (x *drpcIdentityRepo_DataPutStream) SendAndClose(m *Ok) error {
if err := x.MsgSend(m, drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCIdentityRepo_DataDeleteStream interface {
drpc.Stream
SendAndClose(*Ok) error
}
type drpcIdentityRepo_DataDeleteStream struct {
drpc.Stream
}
func (x *drpcIdentityRepo_DataDeleteStream) SendAndClose(m *Ok) error {
if err := x.MsgSend(m, drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto{}); err != nil {
return err
}
return x.CloseSend()
}
type DRPCIdentityRepo_DataPullStream interface {
drpc.Stream
SendAndClose(*DataPullResponse) error
}
type drpcIdentityRepo_DataPullStream struct {
drpc.Stream
}
func (x *drpcIdentityRepo_DataPullStream) SendAndClose(m *DataPullResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_identityrepo_identityrepoproto_protos_identityrepo_proto{}); err != nil {
return err
}
return x.CloseSend()
}

View file

@ -0,0 +1,55 @@
syntax = "proto3";
package identityRepo;
option go_package = "identityrepo/identityrepoproto";
service IdentityRepo {
// DataPut puts client data to a repository
rpc DataPut(DataPutRequest) returns (Ok);
// DataDelete deletes client data from a repository
rpc DataDelete(DataDeleteRequest) returns (Ok);
// DataPull pulls client data from a repository
rpc DataPull(DataPullRequest) returns (DataPullResponse);
}
message Data {
// kind is a string representing the kind of data
string kind = 1;
// data is a byte payload
bytes data = 2;
// data signature
bytes signature = 3;
}
message DataWithIdentity {
string identity = 1;
repeated Data data = 2;
}
message DataPutRequest {
// string representation of identity, must be equal handshake result
string identity = 1;
// data to save
repeated Data data = 2;
}
message DataDeleteRequest {
// string representation of identity, must be equal handshake result
string identity = 1;
// kinds of data to delete, if empty all kinds will be deleted
repeated string kinds = 2;
}
message DataPullRequest {
// list of identities wanted to request
repeated string identities = 1;
// kinds of data wanted to request
repeated string kinds = 2;
}
message DataPullResponse {
repeated DataWithIdentity data = 1;
}
message Ok {}

View file

@ -3,15 +3,18 @@ package peerservice
import (
"context"
"errors"
"fmt"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/server"
"github.com/anyproto/any-sync/net/transport"
"github.com/anyproto/any-sync/net/transport/quic"
"github.com/anyproto/any-sync/net/transport/yamux"
"github.com/anyproto/any-sync/nodeconf"
"go.uber.org/zap"
"strings"
"sync"
)
@ -31,33 +34,49 @@ func New() PeerService {
type PeerService interface {
Dial(ctx context.Context, peerId string) (pr peer.Peer, err error)
SetPeerAddrs(peerId string, addrs []string)
PreferQuic(prefer bool)
transport.Accepter
app.Component
}
type peerService struct {
yamux transport.Transport
nodeConf nodeconf.NodeConf
peerAddrs map[string][]string
pool pool.Pool
server server.DRPCServer
mu sync.RWMutex
yamux transport.Transport
quic transport.Transport
nodeConf nodeconf.NodeConf
peerAddrs map[string][]string
pool pool.Pool
server server.DRPCServer
preferQuic bool
mu sync.RWMutex
}
func (p *peerService) Init(a *app.App) (err error) {
p.yamux = a.MustComponent(yamux.CName).(transport.Transport)
p.quic = a.MustComponent(quic.CName).(transport.Transport)
p.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
p.pool = a.MustComponent(pool.CName).(pool.Pool)
p.server = a.MustComponent(server.CName).(server.DRPCServer)
p.peerAddrs = map[string][]string{}
p.yamux.SetAccepter(p)
p.quic.SetAccepter(p)
return nil
}
var (
yamuxPreferSchemes = []string{transport.Yamux, transport.Quic}
quicPreferSchemes = []string{transport.Quic, transport.Yamux}
)
func (p *peerService) Name() (name string) {
return CName
}
func (p *peerService) PreferQuic(prefer bool) {
p.mu.Lock()
p.preferQuic = prefer
p.mu.Unlock()
}
func (p *peerService) Dial(ctx context.Context, peerId string) (pr peer.Peer, err error) {
p.mu.RLock()
defer p.mu.RUnlock()
@ -69,11 +88,15 @@ func (p *peerService) Dial(ctx context.Context, peerId string) (pr peer.Peer, er
var mc transport.MultiConn
log.DebugCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
for _, addr := range addrs {
mc, err = p.yamux.Dial(ctx, addr)
if err != nil {
log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err))
} else {
var schemes = yamuxPreferSchemes
if p.preferQuic {
schemes = quicPreferSchemes
}
err = ErrAddrsNotFound
for _, sch := range schemes {
if mc, err = p.dialScheme(ctx, sch, addrs); err == nil {
break
}
}
@ -90,6 +113,31 @@ func (p *peerService) Dial(ctx context.Context, peerId string) (pr peer.Peer, er
return peer.NewPeer(mc, p.server)
}
func (p *peerService) dialScheme(ctx context.Context, sch string, addrs []string) (mc transport.MultiConn, err error) {
var tr transport.Transport
switch sch {
case transport.Quic:
tr = p.quic
case transport.Yamux:
tr = p.yamux
default:
return nil, fmt.Errorf("unexpected transport: %v", sch)
}
err = ErrAddrsNotFound
for _, addr := range addrs {
if scheme(addr) != sch {
continue
}
if mc, err = tr.Dial(ctx, stripScheme(addr)); err == nil {
return
} else {
log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err))
}
}
return
}
func (p *peerService) Accept(mc transport.MultiConn) (err error) {
pr, err := peer.NewPeer(mc, p.server)
if err != nil {
@ -117,3 +165,17 @@ func (p *peerService) getPeerAddrs(peerId string) ([]string, error) {
}
return addrs, nil
}
func scheme(addr string) string {
if idx := strings.Index(addr, "://"); idx != -1 {
return addr[:idx]
}
return transport.Yamux
}
func stripScheme(addr string) string {
if idx := strings.Index(addr, "://"); idx != -1 {
return addr[idx+3:]
}
return addr
}

View file

@ -0,0 +1,170 @@
package peerservice
import (
"context"
"fmt"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/rpctest"
"github.com/anyproto/any-sync/net/transport/mock_transport"
"github.com/anyproto/any-sync/net/transport/quic"
"github.com/anyproto/any-sync/net/transport/yamux"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
)
var ctx = context.Background()
func TestPeerService_Dial(t *testing.T) {
var addrs = []string{
"yamux://127.0.0.1:1111",
"quic://127.0.0.1:1112",
}
t.Run("prefer yamux", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
fx.PreferQuic(false)
var peerId = "p1"
fx.nodeConf.EXPECT().PeerAddresses(peerId).Return(addrs, true)
fx.yamux.MockTransport.EXPECT().Dial(ctx, "127.0.0.1:1111").Return(fx.mockMC(peerId), nil)
p, err := fx.Dial(ctx, peerId)
require.NoError(t, err)
assert.NotNil(t, p)
})
t.Run("prefer quic", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
fx.PreferQuic(true)
var peerId = "p1"
fx.nodeConf.EXPECT().PeerAddresses(peerId).Return(addrs, true)
fx.quic.MockTransport.EXPECT().Dial(ctx, "127.0.0.1:1112").Return(fx.mockMC(peerId), nil)
p, err := fx.Dial(ctx, peerId)
require.NoError(t, err)
assert.NotNil(t, p)
})
t.Run("first failed", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
fx.PreferQuic(true)
var peerId = "p1"
fx.nodeConf.EXPECT().PeerAddresses(peerId).Return(addrs, true)
fx.quic.MockTransport.EXPECT().Dial(ctx, "127.0.0.1:1112").Return(nil, fmt.Errorf("test"))
fx.yamux.MockTransport.EXPECT().Dial(ctx, "127.0.0.1:1111").Return(fx.mockMC(peerId), nil)
p, err := fx.Dial(ctx, peerId)
require.NoError(t, err)
assert.NotNil(t, p)
})
t.Run("peerId mismatched", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
fx.PreferQuic(false)
var peerId = "p1"
fx.nodeConf.EXPECT().PeerAddresses(peerId).Return(addrs, true)
fx.yamux.MockTransport.EXPECT().Dial(ctx, "127.0.0.1:1111").Return(fx.mockMC(peerId+"not valid"), nil)
p, err := fx.Dial(ctx, peerId)
assert.EqualError(t, err, ErrPeerIdMismatched.Error())
assert.Nil(t, p)
})
t.Run("custom addr", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
fx.PreferQuic(false)
var peerId = "p1"
fx.SetPeerAddrs(peerId, addrs)
fx.nodeConf.EXPECT().PeerAddresses(peerId).Return(nil, false)
fx.yamux.MockTransport.EXPECT().Dial(ctx, "127.0.0.1:1111").Return(fx.mockMC(peerId), nil)
p, err := fx.Dial(ctx, peerId)
require.NoError(t, err)
assert.NotNil(t, p)
})
t.Run("addr without scheme", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
fx.PreferQuic(false)
var peerId = "p1"
fx.nodeConf.EXPECT().PeerAddresses(peerId).Return([]string{"127.0.0.1:1111"}, true)
fx.yamux.MockTransport.EXPECT().Dial(ctx, "127.0.0.1:1111").Return(fx.mockMC(peerId), nil)
p, err := fx.Dial(ctx, peerId)
require.NoError(t, err)
assert.NotNil(t, p)
})
}
func TestPeerService_Accept(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
mc := fx.mockMC("p1")
require.NoError(t, fx.Accept(mc))
}
type fixture struct {
PeerService
a *app.App
ctrl *gomock.Controller
quic mock_transport.TransportComponent
yamux mock_transport.TransportComponent
nodeConf *mock_nodeconf.MockService
}
func newFixture(t *testing.T) *fixture {
ctrl := gomock.NewController(t)
fx := &fixture{
PeerService: New(),
ctrl: ctrl,
a: new(app.App),
quic: mock_transport.NewTransportComponent(ctrl, quic.CName),
yamux: mock_transport.NewTransportComponent(ctrl, yamux.CName),
nodeConf: mock_nodeconf.NewMockService(ctrl),
}
fx.quic.EXPECT().SetAccepter(fx.PeerService)
fx.yamux.EXPECT().SetAccepter(fx.PeerService)
fx.nodeConf.EXPECT().Name().Return(nodeconf.CName).AnyTimes()
fx.nodeConf.EXPECT().Init(gomock.Any())
fx.nodeConf.EXPECT().Run(gomock.Any())
fx.nodeConf.EXPECT().Close(gomock.Any())
fx.a.Register(fx.PeerService).Register(fx.quic).Register(fx.yamux).Register(fx.nodeConf).Register(pool.New()).Register(rpctest.NewTestServer())
require.NoError(t, fx.a.Start(ctx))
return fx
}
func (fx *fixture) mockMC(peerId string) *mock_transport.MockMultiConn {
mc := mock_transport.NewMockMultiConn(fx.ctrl)
cctx := peer.CtxWithPeerId(ctx, peerId)
mc.EXPECT().Context().Return(cctx).AnyTimes()
mc.EXPECT().Accept().Return(nil, fmt.Errorf("test")).AnyTimes()
mc.EXPECT().Close().AnyTimes()
return mc
}
func (fx *fixture) finish(t *testing.T) {
require.NoError(t, fx.a.Close(ctx))
fx.ctrl.Finish()
}

View file

@ -39,9 +39,10 @@ func New() SecureService {
type SecureService interface {
SecureOutbound(ctx context.Context, conn net.Conn) (cctx context.Context, err error)
HandshakeOutbound(ctx context.Context, conn io.ReadWriteCloser, peerId string) (cctx context.Context, err error)
SecureInbound(ctx context.Context, conn net.Conn) (cctx context.Context, err error)
HandshakeInbound(ctx context.Context, conn io.ReadWriteCloser, remotePeerId string) (cctx context.Context, err error)
ServerTlsConfig() (*tls.Config, error)
TlsConfig() (*tls.Config, <-chan crypto.PubKey, error)
app.Component
}
@ -124,7 +125,10 @@ func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (cctx
if err != nil {
return nil, handshake.HandshakeError{Err: err}
}
peerId := sc.RemotePeer().String()
return s.HandshakeOutbound(ctx, sc, sc.RemotePeer().String())
}
func (s *secureService) HandshakeOutbound(ctx context.Context, conn io.ReadWriteCloser, peerId string) (cctx context.Context, err error) {
confTypes := s.nodeconf.NodeTypes(peerId)
var checker handshake.CredentialChecker
if len(confTypes) > 0 {
@ -132,23 +136,23 @@ func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (cctx
} else {
checker = s.noVerifyChecker
}
res, err := handshake.OutgoingHandshake(ctx, sc, sc.RemotePeer().String(), checker)
res, err := handshake.OutgoingHandshake(ctx, conn, peerId, checker)
if err != nil {
return nil, err
}
cctx = context.Background()
cctx = peer.CtxWithPeerId(cctx, sc.RemotePeer().String())
cctx = peer.CtxWithPeerId(cctx, peerId)
cctx = peer.CtxWithIdentity(cctx, res.Identity)
cctx = peer.CtxWithClientVersion(cctx, res.ClientVersion)
return cctx, nil
}
func (s *secureService) ServerTlsConfig() (*tls.Config, error) {
func (s *secureService) TlsConfig() (*tls.Config, <-chan crypto.PubKey, error) {
p2pIdn, err := libp2ptls.NewIdentity(s.key)
if err != nil {
return nil, err
return nil, nil, err
}
conf, _ := p2pIdn.ConfigForPeer("")
conf, keyCh := p2pIdn.ConfigForPeer("")
conf.NextProtos = []string{"anysync"}
return conf, nil
return conf, keyCh, nil
}

View file

@ -0,0 +1,26 @@
package mock_transport
import (
"github.com/anyproto/any-sync/app"
"go.uber.org/mock/gomock"
)
func NewTransportComponent(ctrl *gomock.Controller, name string) TransportComponent {
return TransportComponent{
CName: name,
MockTransport: NewMockTransport(ctrl),
}
}
type TransportComponent struct {
CName string
*MockTransport
}
func (t TransportComponent) Init(a *app.App) (err error) {
return nil
}
func (t TransportComponent) Name() (name string) {
return t.CName
}

View file

@ -0,0 +1,13 @@
package quic
type configGetter interface {
GetQuic() Config
}
type Config struct {
ListenAddrs []string `yaml:"listenAddrs"`
WriteTimeoutSec int `yaml:"writeTimeoutSec"`
DialTimeoutSec int `yaml:"dialTimeoutSec"`
MaxStreams int64 `yaml:"maxStreams"`
KeepAlivePeriodSec int `yaml:"keepAlivePeriodSec"`
}

View file

@ -0,0 +1,86 @@
package quic
import (
"context"
"errors"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/transport"
"github.com/quic-go/quic-go"
"net"
)
func newConn(cctx context.Context, qconn quic.Connection) transport.MultiConn {
cctx = peer.CtxWithPeerAddr(cctx, transport.Quic+"://"+qconn.RemoteAddr().String())
return &quicMultiConn{
cctx: cctx,
Connection: qconn,
}
}
type quicMultiConn struct {
cctx context.Context
quic.Connection
}
func (q *quicMultiConn) Context() context.Context {
return q.cctx
}
func (q *quicMultiConn) Accept() (conn net.Conn, err error) {
stream, err := q.Connection.AcceptStream(context.Background())
if err != nil {
if errors.Is(err, quic.ErrServerClosed) {
err = transport.ErrConnClosed
} else if aerr, ok := err.(*quic.ApplicationError); ok && aerr.ErrorCode == 2 {
err = transport.ErrConnClosed
}
return nil, err
}
return quicNetConn{
Stream: stream,
localAddr: q.LocalAddr(),
remoteAddr: q.RemoteAddr(),
}, nil
}
func (q *quicMultiConn) Open(ctx context.Context) (conn net.Conn, err error) {
stream, err := q.OpenStreamSync(ctx)
if err != nil {
return nil, err
}
return quicNetConn{
Stream: stream,
localAddr: q.LocalAddr(),
remoteAddr: q.RemoteAddr(),
}, nil
}
func (q *quicMultiConn) Addr() string {
return transport.Quic + "://" + q.RemoteAddr().String()
}
func (q *quicMultiConn) IsClosed() bool {
select {
case <-q.Connection.Context().Done():
return true
default:
return false
}
}
func (q *quicMultiConn) Close() error {
return q.Connection.CloseWithError(2, "")
}
type quicNetConn struct {
quic.Stream
localAddr, remoteAddr net.Addr
}
func (q quicNetConn) LocalAddr() net.Addr {
return q.localAddr
}
func (q quicNetConn) RemoteAddr() net.Addr {
return q.remoteAddr
}

204
net/transport/quic/quic.go Normal file
View file

@ -0,0 +1,204 @@
package quic
import (
"context"
"crypto/tls"
"fmt"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/net/secureservice"
"github.com/anyproto/any-sync/net/transport"
libp2crypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/quic-go/quic-go"
"go.uber.org/zap"
"net"
"sync"
"time"
)
const CName = "net.transport.quic"
var log = logger.NewNamed(CName)
func New() Quic {
return new(quicTransport)
}
type Quic interface {
ListenAddrs(ctx context.Context, addrs ...string) (err error)
transport.Transport
app.ComponentRunnable
}
type quicTransport struct {
secure secureservice.SecureService
accepter transport.Accepter
conf Config
quicConf *quic.Config
listeners []*quic.Listener
listCtx context.Context
listCtxCancel context.CancelFunc
mu sync.Mutex
}
func (q *quicTransport) Init(a *app.App) (err error) {
q.secure = a.MustComponent(secureservice.CName).(secureservice.SecureService)
q.conf = a.MustComponent("config").(configGetter).GetQuic()
if q.conf.MaxStreams <= 0 {
q.conf.MaxStreams = 128
}
if q.conf.KeepAlivePeriodSec <= 0 {
q.conf.KeepAlivePeriodSec = 25
}
q.quicConf = &quic.Config{
HandshakeIdleTimeout: time.Duration(q.conf.DialTimeoutSec) * time.Second,
MaxIncomingStreams: q.conf.MaxStreams,
KeepAlivePeriod: time.Duration(q.conf.KeepAlivePeriodSec) * time.Second,
}
return
}
func (q *quicTransport) Name() (name string) {
return CName
}
func (q *quicTransport) SetAccepter(accepter transport.Accepter) {
q.accepter = accepter
}
func (q *quicTransport) Run(ctx context.Context) (err error) {
if q.accepter == nil {
return fmt.Errorf("can't run service without accepter")
}
q.listCtx, q.listCtxCancel = context.WithCancel(context.Background())
return q.ListenAddrs(ctx, q.conf.ListenAddrs...)
}
func (q *quicTransport) ListenAddrs(ctx context.Context, addrs ...string) (err error) {
q.mu.Lock()
defer q.mu.Unlock()
var tlsConf tls.Config
tlsConf.GetConfigForClient = func(_ *tls.ClientHelloInfo) (*tls.Config, error) {
conf, _, tlsErr := q.secure.TlsConfig()
return conf, tlsErr
}
tlsConf.NextProtos = []string{"anysync"}
if err != nil {
return
}
for _, listAddr := range addrs {
list, err := quic.ListenAddr(listAddr, &tlsConf, q.quicConf)
if err != nil {
return err
}
q.listeners = append(q.listeners, list)
go q.acceptLoop(q.listCtx, list)
}
return
}
func (q *quicTransport) Dial(ctx context.Context, addr string) (mc transport.MultiConn, err error) {
tlsConf, keyCh, err := q.secure.TlsConfig()
if err != nil {
return nil, err
}
qConn, err := quic.DialAddr(ctx, addr, tlsConf, q.quicConf)
if err != nil {
return nil, err
}
var remotePubKey libp2crypto.PubKey
select {
case remotePubKey = <-keyCh:
default:
}
if remotePubKey == nil {
_ = qConn.CloseWithError(1, "")
return nil, fmt.Errorf("libp2p tls handshake bug: no key")
}
remotePeerId, err := peer.IDFromPublicKey(remotePubKey)
if err != nil {
_ = qConn.CloseWithError(1, "")
return nil, err
}
stream, err := qConn.OpenStreamSync(ctx)
if err != nil {
_ = qConn.CloseWithError(1, err.Error())
return nil, err
}
defer func() {
_ = stream.Close()
}()
cctx, err := q.secure.HandshakeOutbound(ctx, stream, remotePeerId.String())
if err != nil {
return nil, err
}
return newConn(cctx, qConn), nil
}
func (q *quicTransport) acceptLoop(ctx context.Context, list *quic.Listener) {
l := log.With(zap.String("localAddr", list.Addr().String()))
l.Info("quic listener started")
defer func() {
l.Debug("quic listener stopped")
}()
for {
conn, err := list.Accept(ctx)
if err != nil {
if err != net.ErrClosed {
l.Error("listener closed with error", zap.Error(err))
}
return
}
go func() {
if err := q.accept(conn); err != nil {
l.Info("accept error", zap.Error(err))
}
}()
}
}
func (q *quicTransport) accept(conn quic.Connection) (err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(q.conf.DialTimeoutSec))
defer cancel()
remotePubKey, err := libp2ptls.PubKeyFromCertChain(conn.ConnectionState().TLS.PeerCertificates)
if err != nil {
return err
}
remotePeerId, err := peer.IDFromPublicKey(remotePubKey)
if err != nil {
return err
}
// wait new stream for any handshake
stream, err := conn.AcceptStream(ctx)
if err != nil {
return err
}
defer func() {
_ = stream.Close()
}()
cctx, err := q.secure.HandshakeInbound(ctx, stream, remotePeerId.String())
if err != nil {
return
}
mc := newConn(cctx, conn)
return q.accepter.Accept(mc)
}
func (q *quicTransport) Close(ctx context.Context) (err error) {
if q.listCtx != nil {
q.listCtxCancel()
}
for _, lis := range q.listeners {
_ = lis.Close()
}
return
}

View file

@ -0,0 +1,255 @@
package quic
import (
"bytes"
"context"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/net/secureservice"
"github.com/anyproto/any-sync/net/transport"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/anyproto/any-sync/testutil/accounttest"
"github.com/anyproto/any-sync/testutil/testnodeconf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"io"
"net"
"sync"
"testing"
"time"
)
var ctx = context.Background()
func TestQuicTransport_Dial(t *testing.T) {
fxS := newFixture(t)
defer fxS.finish(t)
fxC := newFixture(t)
defer fxC.finish(t)
mcC, err := fxC.Dial(ctx, fxS.addr)
require.NoError(t, err)
var mcS transport.MultiConn
select {
case mcS = <-fxS.accepter.mcs:
case <-time.After(time.Second * 5):
require.True(t, false, "timeout")
}
var (
sData string
acceptErr error
copyErr error
done = make(chan struct{})
)
go func() {
defer close(done)
conn, serr := mcS.Accept()
if serr != nil {
acceptErr = serr
return
}
buf := bytes.NewBuffer(nil)
_, copyErr = io.Copy(buf, conn)
sData = buf.String()
return
}()
conn, err := mcC.Open(ctx)
require.NoError(t, err)
data := "some data"
_, err = conn.Write([]byte(data))
require.NoError(t, err)
require.NoError(t, conn.Close())
<-done
assert.NoError(t, acceptErr)
assert.Equal(t, data, sData)
assert.NoError(t, copyErr)
}
// no deadline - 69100 rps
// common write deadline - 66700 rps
// subconn write deadline - 67100 rps
func TestWriteBenchReuse(t *testing.T) {
t.Skip()
var (
numSubConn = 10
numWrites = 10000
)
fxS := newFixture(t)
defer fxS.finish(t)
fxC := newFixture(t)
defer fxC.finish(t)
mcC, err := fxC.Dial(ctx, fxS.addr)
require.NoError(t, err)
mcS := <-fxS.accepter.mcs
go func() {
for i := 0; i < numSubConn; i++ {
conn, err := mcS.Accept()
require.NoError(t, err)
go func(sc net.Conn) {
var b = make([]byte, 1024)
for {
n, _ := sc.Read(b)
if n > 0 {
sc.Write(b[:n])
} else {
break
}
}
}(conn)
}
}()
var wg sync.WaitGroup
wg.Add(numSubConn)
st := time.Now()
for i := 0; i < numSubConn; i++ {
conn, err := mcC.Open(ctx)
require.NoError(t, err)
go func(sc net.Conn) {
defer sc.Close()
defer wg.Done()
for j := 0; j < numWrites; j++ {
var b = []byte("some data some data some data some data some data some data some data some data some data")
sc.Write(b)
sc.Read(b)
}
}(conn)
}
wg.Wait()
dur := time.Since(st)
t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds())
}
func TestWriteBenchNew(t *testing.T) {
t.Skip()
var (
numSubConn = 10
numWrites = 10000
)
fxS := newFixture(t)
defer fxS.finish(t)
fxC := newFixture(t)
defer fxC.finish(t)
mcC, err := fxC.Dial(ctx, fxS.addr)
require.NoError(t, err)
mcS := <-fxS.accepter.mcs
go func() {
for i := 0; i < numSubConn; i++ {
require.NoError(t, err)
go func() {
var b = make([]byte, 1024)
for {
conn, _ := mcS.Accept()
n, _ := conn.Read(b)
if n > 0 {
conn.Write(b[:n])
} else {
_ = conn.Close()
break
}
conn.Close()
}
}()
}
}()
var wg sync.WaitGroup
wg.Add(numSubConn)
st := time.Now()
for i := 0; i < numSubConn; i++ {
go func() {
defer wg.Done()
for j := 0; j < numWrites; j++ {
sc, err := mcC.Open(ctx)
require.NoError(t, err)
var b = []byte("some data some data some data some data some data some data some data some data some data")
sc.Write(b)
sc.Read(b)
sc.Close()
}
}()
}
wg.Wait()
dur := time.Since(st)
t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds())
}
type fixture struct {
*quicTransport
a *app.App
ctrl *gomock.Controller
mockNodeConf *mock_nodeconf.MockService
acc *accounttest.AccountTestService
accepter *testAccepter
addr string
}
func newFixture(t *testing.T) *fixture {
fx := &fixture{
quicTransport: New().(*quicTransport),
ctrl: gomock.NewController(t),
acc: &accounttest.AccountTestService{},
accepter: &testAccepter{mcs: make(chan transport.MultiConn, 100)},
a: new(app.App),
}
fx.mockNodeConf = mock_nodeconf.NewMockService(fx.ctrl)
fx.mockNodeConf.EXPECT().Init(gomock.Any())
fx.mockNodeConf.EXPECT().Name().Return(nodeconf.CName).AnyTimes()
fx.mockNodeConf.EXPECT().Run(ctx)
fx.mockNodeConf.EXPECT().Close(ctx)
fx.mockNodeConf.EXPECT().NodeTypes(gomock.Any()).Return([]nodeconf.NodeType{nodeconf.NodeTypeTree}).AnyTimes()
fx.a.Register(fx.acc).Register(newTestConf()).Register(fx.mockNodeConf).Register(secureservice.New()).Register(fx.quicTransport).Register(fx.accepter)
require.NoError(t, fx.a.Start(ctx))
fx.addr = fx.listeners[0].Addr().String()
return fx
}
func (fx *fixture) finish(t *testing.T) {
require.NoError(t, fx.a.Close(ctx))
fx.ctrl.Finish()
}
func newTestConf() *testConf {
return &testConf{testnodeconf.GenNodeConfig(1)}
}
type testConf struct {
*testnodeconf.Config
}
func (c *testConf) GetQuic() Config {
return Config{
ListenAddrs: []string{"127.0.0.1:0"},
WriteTimeoutSec: 10,
DialTimeoutSec: 10,
}
}
type testAccepter struct {
err error
mcs chan transport.MultiConn
}
func (t *testAccepter) Accept(mc transport.MultiConn) (err error) {
t.mcs <- mc
return t.err
}
func (t *testAccepter) Init(a *app.App) (err error) {
a.MustComponent(CName).(transport.Transport).SetAccepter(t)
return nil
}
func (t *testAccepter) Name() (name string) { return "testAccepter" }

View file

@ -11,6 +11,11 @@ var (
ErrConnClosed = errors.New("connection closed")
)
const (
Yamux = "yamux"
Quic = "quic"
)
// Transport is a common interface for a network transport
type Transport interface {
// SetAccepter sets accepter that will be called for new connections

View file

@ -12,7 +12,7 @@ import (
)
func NewMultiConn(cctx context.Context, luConn *connutil.LastUsageConn, addr string, sess *yamux.Session) transport.MultiConn {
cctx = peer.CtxWithPeerAddr(cctx, sess.RemoteAddr().String())
cctx = peer.CtxWithPeerAddr(cctx, transport.Yamux+"://"+sess.RemoteAddr().String())
return &yamuxConn{
ctx: cctx,
luConn: luConn,
@ -44,7 +44,7 @@ func (y *yamuxConn) Context() context.Context {
}
func (y *yamuxConn) Addr() string {
return y.addr
return transport.Yamux + "://" + y.addr
}
func (y *yamuxConn) Accept() (conn net.Conn, err error) {