From bc2049ccbcf3da2e112c699ca83477d3becff5b1 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 26 May 2023 19:18:51 +0200 Subject: [PATCH] wip: yamux transport + pool rewrite --- go.mod | 1 + go.sum | 69 +++++++ .../conn.go => connutil/timeout.go} | 12 +- net/connutil/usage.go | 30 ++++ net/dialer/dialer.go | 4 +- net/pool/pool.go | 72 ++++---- net/pool/poolservice.go | 37 ++-- net/secureservice/secureservice.go | 18 +- net/secureservice/secureservice_test.go | 7 +- net/transport/transport.go | 34 ++++ net/transport/yamux/config.go | 12 ++ net/transport/yamux/conn.go | 27 +++ net/transport/yamux/util.go | 18 ++ net/transport/yamux/util_windows.go | 41 +++++ net/transport/yamux/yamux.go | 168 ++++++++++++++++++ net/transport/yamux/yamux_test.go | 134 ++++++++++++++ 16 files changed, 610 insertions(+), 74 deletions(-) rename net/{timeoutconn/conn.go => connutil/timeout.go} (82%) create mode 100644 net/connutil/usage.go create mode 100644 net/transport/transport.go create mode 100644 net/transport/yamux/config.go create mode 100644 net/transport/yamux/conn.go create mode 100644 net/transport/yamux/util.go create mode 100644 net/transport/yamux/util_windows.go create mode 100644 net/transport/yamux/yamux.go create mode 100644 net/transport/yamux/yamux_test.go diff --git a/go.mod b/go.mod index dff08a5c..0f6bec3b 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/pprof v0.0.0-20230510103437-eeec1cb781c3 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/hashicorp/yamux v0.1.1 // indirect github.com/huin/goupnp v1.2.0 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect diff --git a/go.sum b/go.sum index 873e3903..83407153 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,11 @@ filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek= filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= +github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/alecthomas/kingpin/v2 v2.3.2 h1:H0aULhgmSzN8xQ3nX1uxtdlTHYoPLu5AhHxWrKI6ocU= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/anyproto/go-chash v0.1.0 h1:I9meTPjXFRfXZHRJzjOHC/XF7Q5vzysKkiT/grsogXY= @@ -22,6 +25,9 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheggaaa/mb/v3 v3.0.1 h1:BuEOipGTqybXYi5KXVCpqhR1LWN2lrurq6UrH+VBhXc= github.com/cheggaaa/mb/v3 v3.0.1/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI= +github.com/chzyer/readline v1.5.1 h1:upd/6fQk4src78LMRzh5vItIt361/o4uq553V8B5sGI= +github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= github.com/corona10/goimagehash v1.0.2 h1:pUfB0LnsJASMPGEZLj7tGY251vF+qLGqOgEP4rUs6kA= github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 h1:HVTnpeuvF6Owjd5mniCL8DEXo7uYXdQEmOP4FJbV5tg= github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE= @@ -34,12 +40,20 @@ github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6Uh github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/dgraph-io/badger v1.6.2 h1:mNw0qs90GVgGGWylh0umH5iag1j6n/PeJtNvL6KY/x8= +github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= +github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= +github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= +github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= @@ -51,6 +65,7 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/goccy/go-graphviz v0.1.1 h1:MGrsnzBxTyt7KG8FhHsFPDTGvF7UaQMmSa6A610DqPg= github.com/goccy/go-graphviz v0.1.1/go.mod h1:lpnwvVDjskayq84ZxG8tGCPeZX/WxP88W+OJajh+gFk= +github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -62,6 +77,7 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -70,22 +86,29 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/pprof v0.0.0-20230510103437-eeec1cb781c3 h1:2XF1Vzq06X+inNqgJ9tRnGuw+ZVCB3FazXODD6JE1R8= github.com/google/pprof v0.0.0-20230510103437-eeec1cb781c3/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk= +github.com/google/renameio v0.1.0 h1:GOZbcHa3HfsPKPlmyPyN2KEohoMXOhdMbHrvbpl2QaA= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU= github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= +github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.2 h1:Dwmkdr5Nc/oBiXgJS3CDHNhJtIHkuZ3DZF5twqnfBdU= +github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= +github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= github.com/huandu/go-assert v1.1.5 h1:fjemmA7sSfYHJD7CUqs9qTwwfdNAx7/j2/ZlHXzNB3c= github.com/huandu/go-assert v1.1.5/go.mod h1:yOLvuqZwmcHIC5rIzrBhT7D3Q9c3GFnd0JrPVhn/06U= github.com/huandu/skiplist v1.2.0 h1:gox56QD77HzSC0w+Ws3MH3iie755GBJU1OER3h5VsYw= github.com/huandu/skiplist v1.2.0/go.mod h1:7v3iFjLcSAzO4fN5B8dvebvo/qsfumiLiDXMrPiHF9w= github.com/huin/goupnp v1.2.0 h1:uOKW26NG1hsSSbXIZ1IR7XP9Gjd1U8pnLaCMgntmkmY= github.com/huin/goupnp v1.2.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= +github.com/ianlancetaylor/demangle v0.0.0-20220517205856-0058ec4f073c h1:rwmN+hgiyp8QyBqzdEX43lTjKAxaqCrYHaU5op5P9J8= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= @@ -110,6 +133,8 @@ github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0M github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= +github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1Ro= +github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo= github.com/ipfs/go-ipfs-blockstore v1.3.0 h1:m2EXaWgwTzAfsmt5UdJ7Is6l4gJcaM/A12XwJyvYvMM= github.com/ipfs/go-ipfs-blockstore v1.3.0/go.mod h1:KgtZyc9fq+P2xJUiCAzbRdhhqJHvsw8u2Dlqy2MyRTE= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= @@ -160,16 +185,23 @@ github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvB github.com/ipld/go-ipld-prime v0.20.0 h1:Ud3VwE9ClxpO2LkCYP7vWPc0Fo+dYdYzgxUJZ3uRG4g= github.com/ipld/go-ipld-prime v0.20.0/go.mod h1:PzqZ/ZR981eKbgdr3y2DJYeD/8bgMawdGVlJDE8kK+M= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= +github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= +github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.5.0 h1:e8esj/e4R+SAOwFwN+n3zr0nYeCyeweozKfO23MvHzY= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= @@ -182,6 +214,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -189,29 +222,42 @@ github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoR github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c= +github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM= github.com/libp2p/go-libp2p v0.27.3 h1:tkV/zm3KCZ4R5er9Xcs2pt0YNB4JH0iBfGAtHJdLHRs= github.com/libp2p/go-libp2p v0.27.3/go.mod h1:FAvvfQa/YOShUYdiSS03IR9OXzkcJXwcNA2FUCh9ImE= github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= +github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= github.com/libp2p/go-nat v0.1.0 h1:MfVsH6DLcpa04Xr+p8hmVRG4juse0s3J8HyNWYHffXg= github.com/libp2p/go-netroute v0.2.1 h1:V8kVrpD8GK0Riv15/7VN6RbUQ3URNZVosw7H2v9tksU= +github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo= +github.com/libp2p/go-reuseport v0.2.0 h1:18PRvIMlpY6ZK85nIAicSBuXXvrYoSw3dsBAR7zc560= github.com/libp2p/go-yamux/v4 v4.0.0 h1:+Y80dV2Yx/kv7Y7JKu0LECyVdMXm1VUoko+VQ9rBfZQ= +github.com/libp2p/zeroconf/v2 v2.2.0 h1:Cup06Jv6u81HLhIj1KasuNM/RHHrJ8T7wOTS4+Tv53Q= +github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk= +github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/miekg/dns v1.1.54 h1:5jon9mWcb0sFJGpnI99tOMhCPyJ+RPVz5b63MQG0VWI= github.com/miekg/dns v1.1.54/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= +github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b h1:z78hV3sbSMAUoyUMM0I83AUIT6Hu17AWfgjzIbtrYFc= +github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc h1:PTfri+PuQmWDqERdnNMiD9ZejrlswWrCpBEZgWOiTrc= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= @@ -246,12 +292,17 @@ github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q= github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k= +github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= +github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -270,12 +321,14 @@ github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO github.com/prometheus/procfs v0.10.0 h1:UkG7GPYkO4UZyLnyXjaWYcgOSONqwdBqFUT95ugmt6I= github.com/prometheus/procfs v0.10.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= +github.com/quic-go/qtls-go1-18 v0.2.0 h1:5ViXqBZ90wpUcZS0ge79rf029yx0dYB0McyPJwqqj7U= github.com/quic-go/qtls-go1-19 v0.3.2 h1:tFxjCFcTQzK+oMxG6Zcvp4Dq8dx4yD3dDiIiyc86Z5U= github.com/quic-go/qtls-go1-20 v0.2.2 h1:WLOPx6OY/hxtTxKV1Zrq20FtXtDEkeY00CGQm8GEa3E= github.com/quic-go/quic-go v0.34.0 h1:OvOJ9LFjTySgwOTYUZmNoq0FzVicP8YujpV0kB7m2lU= github.com/quic-go/quic-go v0.34.0/go.mod h1:+4CVgVppm0FNjpG3UcX8Joi/frKOH7/ciD5yGcwOO1g= github.com/quic-go/webtransport-go v0.5.3 h1:5XMlzemqB4qmOlgIus5zB45AcZ2kCgCy2EptUrfOPWU= github.com/quic-go/webtransport-go v0.5.3/go.mod h1:OhmmgJIzTTqXK5xvtuX0oBpLV2GkLWNDA+UeTGJXErU= +github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= @@ -283,16 +336,19 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1 github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa/go.mod h1:2RVY1rIf+2J2o/IM9+vPq9RzmHDSseB7FoXiSNIUsoU= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8= github.com/tyler-smith/go-bip39 v1.1.0/go.mod h1:gUYDtqQw1JS3ZJ8UWVcGTGqqr6YIN3CWg+kkNaLt55U= github.com/warpfork/go-testmark v0.11.0 h1:J6LnV8KpceDvo7spaNU4+DauH2n1x+6RaO2rJrmpQ9U= @@ -303,10 +359,13 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158 h1:WXhVOwj2 github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8= +github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= +github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zeebo/assert v1.1.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= @@ -324,12 +383,15 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/dig v1.16.1 h1:+alNIBsl0qfY0j6epRubp/9obgtrObRAc5aD+6jbWY8= +go.uber.org/fx v1.19.2 h1:SyFgYQFr1Wl0AYstE8vyYIzP4bFz2URrScjwC4cwUvY= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= @@ -350,6 +412,7 @@ golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERs golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/image v0.6.0 h1:bR8b5okrPI3g/gyZakLZHeWxAR8Dn5CyxXv1hLH5g/4= golang.org/x/image v0.6.0/go.mod h1:MXLdDR43H7cDJq5GEGXEVeeNhPgi+YYEQ2pC1byI1x0= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -370,6 +433,7 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -403,6 +467,7 @@ golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXR golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= @@ -430,6 +495,7 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= @@ -438,6 +504,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -446,8 +513,10 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI= lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= storj.io/drpc v0.0.33 h1:yCGZ26r66ZdMP0IcTYsj7WDAUIIjzXk6DJhbhvt9FHI= storj.io/drpc v0.0.33/go.mod h1:vR804UNzhBa49NOJ6HeLjd2H3MakC1j5Gv8bsOQT6N4= diff --git a/net/timeoutconn/conn.go b/net/connutil/timeout.go similarity index 82% rename from net/timeoutconn/conn.go rename to net/connutil/timeout.go index 11e80709..381998f9 100644 --- a/net/timeoutconn/conn.go +++ b/net/connutil/timeout.go @@ -1,4 +1,4 @@ -package timeoutconn +package connutil import ( "errors" @@ -10,18 +10,18 @@ import ( "go.uber.org/zap" ) -var log = logger.NewNamed("common.net.timeoutconn") +var log = logger.NewNamed("common.net.connutil") -type Conn struct { +type TimeoutConn struct { net.Conn timeout time.Duration } -func NewConn(conn net.Conn, timeout time.Duration) *Conn { - return &Conn{conn, timeout} +func NewConn(conn net.Conn, timeout time.Duration) *TimeoutConn { + return &TimeoutConn{conn, timeout} } -func (c *Conn) Write(p []byte) (n int, err error) { +func (c *TimeoutConn) Write(p []byte) (n int, err error) { for { if c.timeout != 0 { if e := c.Conn.SetWriteDeadline(time.Now().Add(c.timeout)); e != nil { diff --git a/net/connutil/usage.go b/net/connutil/usage.go new file mode 100644 index 00000000..826d9c74 --- /dev/null +++ b/net/connutil/usage.go @@ -0,0 +1,30 @@ +package connutil + +import ( + "go.uber.org/atomic" + "net" + "time" +) + +func NewLastUsageConn(conn net.Conn) *LastUsageConn { + return &LastUsageConn{Conn: conn} +} + +type LastUsageConn struct { + net.Conn + lastUsage atomic.Time +} + +func (c *LastUsageConn) Write(p []byte) (n int, err error) { + c.lastUsage.Store(time.Now()) + return c.Conn.Write(p) +} + +func (c *LastUsageConn) Read(p []byte) (n int, err error) { + c.lastUsage.Store(time.Now()) + return c.Conn.Read(p) +} + +func (c *LastUsageConn) LastUsage() time.Time { + return c.lastUsage.Load() +} diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go index 8d49c4a9..f585c1b4 100644 --- a/net/dialer/dialer.go +++ b/net/dialer/dialer.go @@ -7,10 +7,10 @@ import ( "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/logger" net2 "github.com/anyproto/any-sync/net" + "github.com/anyproto/any-sync/net/connutil" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/secureservice" "github.com/anyproto/any-sync/net/secureservice/handshake" - "github.com/anyproto/any-sync/net/timeoutconn" "github.com/anyproto/any-sync/nodeconf" "github.com/libp2p/go-libp2p/core/sec" "go.uber.org/zap" @@ -118,7 +118,7 @@ func (d *dialer) handshake(ctx context.Context, addr, peerId string) (conn drpc. return nil, nil, fmt.Errorf("dialTimeout error: %v; since start: %v", err, time.Since(st)) } - timeoutConn := timeoutconn.NewConn(tcpConn, time.Millisecond*time.Duration(d.config.Stream.TimeoutMilliseconds)) + timeoutConn := connutil.NewConn(tcpConn, time.Millisecond*time.Duration(d.config.Stream.TimeoutMilliseconds)) sc, err = d.transport.SecureOutbound(ctx, timeoutConn) if err != nil { if he, ok := err.(handshake.HandshakeError); ok { diff --git a/net/pool/pool.go b/net/pool/pool.go index b6c0d7df..bd0b58eb 100644 --- a/net/pool/pool.go +++ b/net/pool/pool.go @@ -13,31 +13,35 @@ import ( // Pool creates and caches outgoing connection type Pool interface { - // Get lookups to peer in existing connections or creates and cache new one + // Get lookups to peer in existing connections or creates and outgoing new one Get(ctx context.Context, id string) (peer.Peer, error) - // Dial creates new connection to peer and not use cache - Dial(ctx context.Context, id string) (peer.Peer, error) - // GetOneOf searches at least one existing connection in cache or creates a new one from a randomly selected id from given list + // GetOneOf searches at least one existing connection in outgoing or creates a new one from a randomly selected id from given list GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) - - DialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) + // AddPeer adds incoming peer to the pool + AddPeer(ctx context.Context, p peer.Peer) (err error) } type pool struct { - cache ocache.OCache - dialer dialer.Dialer + outgoing ocache.OCache + incoming ocache.OCache + dialer dialer.Dialer } func (p *pool) Name() (name string) { return CName } -func (p *pool) Run(ctx context.Context) (err error) { - return nil +func (p *pool) Get(ctx context.Context, id string) (pr peer.Peer, err error) { + // if we have incoming connection - try to reuse it + if pr, err = p.get(ctx, p.incoming, id); err != nil { + // or try to get or create outgoing + return p.get(ctx, p.outgoing, id) + } + return } -func (p *pool) Get(ctx context.Context, id string) (peer.Peer, error) { - v, err := p.cache.Get(ctx, id) +func (p *pool) get(ctx context.Context, source ocache.OCache, id string) (peer.Peer, error) { + v, err := source.Get(ctx, id) if err != nil { return nil, err } @@ -47,7 +51,7 @@ func (p *pool) Get(ctx context.Context, id string) (peer.Peer, error) { default: return pr, nil } - _, _ = p.cache.Remove(ctx, id) + _, _ = source.Remove(ctx, id) return p.Get(ctx, id) } @@ -58,14 +62,23 @@ func (p *pool) Dial(ctx context.Context, id string) (peer.Peer, error) { func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) { // finding existing connection for _, peerId := range peerIds { - if v, err := p.cache.Pick(ctx, peerId); err == nil { + if v, err := p.incoming.Pick(ctx, peerId); err == nil { pr := v.(peer.Peer) select { case <-pr.Closed(): default: return pr, nil } - _, _ = p.cache.Remove(ctx, peerId) + _, _ = p.incoming.Remove(ctx, peerId) + } + if v, err := p.outgoing.Pick(ctx, peerId); err == nil { + pr := v.(peer.Peer) + select { + case <-pr.Closed(): + default: + return pr, nil + } + _, _ = p.outgoing.Remove(ctx, peerId) } } // shuffle ids for better consistency @@ -75,8 +88,8 @@ func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error // connecting var lastErr error for _, peerId := range peerIds { - if v, err := p.cache.Get(ctx, peerId); err == nil { - return v.(peer.Peer), nil + if v, err := p.Get(ctx, peerId); err == nil { + return v, nil } else { log.Debug("unable to connect", zap.String("peerId", peerId), zap.Error(err)) lastErr = err @@ -88,27 +101,6 @@ func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error return nil, lastErr } -func (p *pool) DialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) { - // shuffle ids for better consistency - rand.Shuffle(len(peerIds), func(i, j int) { - peerIds[i], peerIds[j] = peerIds[j], peerIds[i] - }) - // connecting - var lastErr error - for _, peerId := range peerIds { - if v, err := p.dialer.Dial(ctx, peerId); err == nil { - return v.(peer.Peer), nil - } else { - log.Debug("unable to connect", zap.String("peerId", peerId), zap.Error(err)) - lastErr = err - } - } - if _, ok := lastErr.(handshake.HandshakeError); !ok { - lastErr = net.ErrUnableToConnect - } - return nil, lastErr -} - -func (p *pool) Close(ctx context.Context) (err error) { - return p.cache.Close() +func (p *pool) AddPeer(ctx context.Context, pr peer.Peer) (err error) { + return p.incoming.Add(pr.Id(), pr) } diff --git a/net/pool/poolservice.go b/net/pool/poolservice.go index 9b69ae24..68daafc5 100644 --- a/net/pool/poolservice.go +++ b/net/pool/poolservice.go @@ -8,6 +8,7 @@ import ( "github.com/anyproto/any-sync/metric" "github.com/anyproto/any-sync/net/dialer" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" "time" ) @@ -23,7 +24,6 @@ func New() Service { type Service interface { Pool - NewPool(name string) Pool app.ComponentRunnable } @@ -40,29 +40,34 @@ func (p *poolService) Init(a *app.App) (err error) { if m := a.Component(metric.CName); m != nil { p.metricReg = m.(metric.Metric).Registry() } - p.pool.cache = ocache.New( + p.pool.outgoing = ocache.New( func(ctx context.Context, id string) (value ocache.Object, err error) { return p.dialer.Dial(ctx, id) }, ocache.WithLogger(log.Sugar()), ocache.WithGCPeriod(time.Minute), ocache.WithTTL(time.Minute*5), - ocache.WithPrometheus(p.metricReg, "netpool", "default"), + ocache.WithPrometheus(p.metricReg, "netpool", "outgoing"), + ) + p.pool.incoming = ocache.New( + func(ctx context.Context, id string) (value ocache.Object, err error) { + return nil, ocache.ErrNotExists + }, + ocache.WithLogger(log.Sugar()), + ocache.WithGCPeriod(time.Minute), + ocache.WithTTL(time.Minute*5), + ocache.WithPrometheus(p.metricReg, "netpool", "incoming"), ) return nil } -func (p *poolService) NewPool(name string) Pool { - return &pool{ - dialer: p.dialer, - cache: ocache.New( - func(ctx context.Context, id string) (value ocache.Object, err error) { - return p.dialer.Dial(ctx, id) - }, - ocache.WithLogger(log.Sugar()), - ocache.WithGCPeriod(time.Minute), - ocache.WithTTL(time.Minute*5), - ocache.WithPrometheus(p.metricReg, "netpool", name), - ), - } +func (p *pool) Run(ctx context.Context) (err error) { + return nil +} + +func (p *pool) Close(ctx context.Context) (err error) { + if e := p.incoming.Close(); e != nil { + log.Warn("close incoming cache error", zap.Error(e)) + } + return p.outgoing.Close() } diff --git a/net/secureservice/secureservice.go b/net/secureservice/secureservice.go index d7d86040..4faca6c0 100644 --- a/net/secureservice/secureservice.go +++ b/net/secureservice/secureservice.go @@ -25,7 +25,7 @@ func New() SecureService { } type SecureService interface { - SecureOutbound(ctx context.Context, conn net.Conn) (sec.SecureConn, error) + SecureOutbound(ctx context.Context, conn net.Conn) (cctx context.Context, sc sec.SecureConn, err error) SecureInbound(ctx context.Context, conn net.Conn) (cctx context.Context, sc sec.SecureConn, err error) app.Component } @@ -93,10 +93,10 @@ func (s *secureService) SecureInbound(ctx context.Context, conn net.Conn) (cctx return } -func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (sec.SecureConn, error) { - sc, err := s.p2pTr.SecureOutbound(ctx, conn, "") +func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (cctx context.Context, sc sec.SecureConn, err error) { + sc, err = s.p2pTr.SecureOutbound(ctx, conn, "") if err != nil { - return nil, handshake.HandshakeError{Err: err} + return nil, nil, handshake.HandshakeError{Err: err} } peerId := sc.RemotePeer().String() confTypes := s.nodeconf.NodeTypes(peerId) @@ -106,10 +106,12 @@ func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (sec. } else { checker = s.noVerifyChecker } - // ignore identity for outgoing connection because we don't need it at this moment - _, err = handshake.OutgoingHandshake(ctx, sc, checker) + identity, err := handshake.OutgoingHandshake(ctx, sc, checker) if err != nil { - return nil, err + return nil, nil, err } - return sc, nil + cctx = context.Background() + cctx = peer.CtxWithPeerId(cctx, sc.RemotePeer().String()) + cctx = peer.CtxWithIdentity(cctx, identity) + return cctx, sc, nil } diff --git a/net/secureservice/secureservice_test.go b/net/secureservice/secureservice_test.go index 2f435a65..e03b92a4 100644 --- a/net/secureservice/secureservice_test.go +++ b/net/secureservice/secureservice_test.go @@ -39,9 +39,12 @@ func TestHandshake(t *testing.T) { fxC := newFixture(t, nc, nc.GetAccountService(1), 0) defer fxC.Finish(t) - secConn, err := fxC.SecureOutbound(ctx, cc) + cctx, secConn, err := fxC.SecureOutbound(ctx, cc) + require.NoError(t, err) + ctxPeerId, err := peer.CtxPeerId(cctx) require.NoError(t, err) assert.Equal(t, nc.GetAccountService(0).Account().PeerId, secConn.RemotePeer().String()) + assert.Equal(t, nc.GetAccountService(0).Account().PeerId, ctxPeerId) res := <-resCh require.NoError(t, res.err) peerId, err := peer.CtxPeerId(res.ctx) @@ -72,7 +75,7 @@ func TestHandshakeIncompatibleVersion(t *testing.T) { }() fxC := newFixture(t, nc, nc.GetAccountService(1), 1) defer fxC.Finish(t) - _, err := fxC.SecureOutbound(ctx, cc) + _, _, err := fxC.SecureOutbound(ctx, cc) require.Equal(t, handshake.ErrIncompatibleVersion, err) res := <-resCh require.Equal(t, handshake.ErrIncompatibleVersion, res.err) diff --git a/net/transport/transport.go b/net/transport/transport.go new file mode 100644 index 00000000..3eba1414 --- /dev/null +++ b/net/transport/transport.go @@ -0,0 +1,34 @@ +package transport + +import ( + "context" + "net" + "time" +) + +// Transport is a common interface for a network transport +type Transport interface { + // SetAccepter sets accepter that will be called for new connections + // this method should be called before app start + SetAccepter(accepter Accepter) + // Dial creates a new connection by given address + Dial(ctx context.Context, addr string) (mc MultiConn, err error) +} + +// MultiConn is an object of multiplexing connection containing handshake info +type MultiConn interface { + // Context returns the connection context that contains handshake details + Context() context.Context + // Accept accepts new sub connections + Accept() (conn net.Conn, err error) + // Open opens new sub connection + Open(ctx context.Context) (conn net.Conn, err error) + // LastUsage returns the time of the last connection activity + LastUsage() time.Time + // Close closes the connection and all sub connections + Close() error +} + +type Accepter interface { + Accept(mc MultiConn) (err error) +} diff --git a/net/transport/yamux/config.go b/net/transport/yamux/config.go new file mode 100644 index 00000000..38f74d4b --- /dev/null +++ b/net/transport/yamux/config.go @@ -0,0 +1,12 @@ +package yamux + +type configGetter interface { + GetYamux() Config +} + +type Config struct { + ListenAddrs []string `yaml:"listenAddrs"` + WriteTimeoutSec int `yaml:"writeTimeoutSec"` + DialTimeoutSec int `yaml:"dialTimeoutSec"` + MaxStreams int `yaml:"maxStreams"` +} diff --git a/net/transport/yamux/conn.go b/net/transport/yamux/conn.go new file mode 100644 index 00000000..e9ad7f59 --- /dev/null +++ b/net/transport/yamux/conn.go @@ -0,0 +1,27 @@ +package yamux + +import ( + "context" + "github.com/anyproto/any-sync/net/connutil" + "github.com/hashicorp/yamux" + "net" + "time" +) + +type yamuxConn struct { + ctx context.Context + luConn *connutil.LastUsageConn + *yamux.Session +} + +func (y *yamuxConn) Open(ctx context.Context) (conn net.Conn, err error) { + return y.Session.Open() +} + +func (y *yamuxConn) LastUsage() time.Time { + return y.luConn.LastUsage() +} + +func (y *yamuxConn) Context() context.Context { + return y.ctx +} diff --git a/net/transport/yamux/util.go b/net/transport/yamux/util.go new file mode 100644 index 00000000..c1299d15 --- /dev/null +++ b/net/transport/yamux/util.go @@ -0,0 +1,18 @@ +//go:build !windows + +package yamux + +import ( + "errors" + "net" +) + +// isTemporary checks if an error is temporary. +func isTemporary(err error) bool { + var nErr net.Error + if errors.As(err, &nErr) { + return nErr.Temporary() + } + + return false +} diff --git a/net/transport/yamux/util_windows.go b/net/transport/yamux/util_windows.go new file mode 100644 index 00000000..390524d5 --- /dev/null +++ b/net/transport/yamux/util_windows.go @@ -0,0 +1,41 @@ +//go:build windows + +package yamux + +import ( + "errors" + "net" + "os" + "syscall" +) + +const ( + _WSAEMFILE syscall.Errno = 10024 + _WSAENETRESET syscall.Errno = 10052 + _WSAENOBUFS syscall.Errno = 10055 +) + +// isTemporary checks if an error is temporary. +// see related go issue for more detail: https://go-review.googlesource.com/c/go/+/208537/ +func isTemporary(err error) bool { + var nErr net.Error + if !errors.As(err, &nErr) { + return false + } + + if nErr.Temporary() { + return true + } + + var sErr *os.SyscallError + if errors.As(err, &sErr) { + switch sErr.Err { + case _WSAENETRESET, + _WSAEMFILE, + _WSAENOBUFS: + return true + } + } + + return false +} diff --git a/net/transport/yamux/yamux.go b/net/transport/yamux/yamux.go new file mode 100644 index 00000000..9d73c1f1 --- /dev/null +++ b/net/transport/yamux/yamux.go @@ -0,0 +1,168 @@ +package yamux + +import ( + "context" + "fmt" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/net/connutil" + "github.com/anyproto/any-sync/net/secureservice" + "github.com/anyproto/any-sync/net/transport" + "github.com/hashicorp/yamux" + "go.uber.org/zap" + "net" + "time" +) + +const CName = "net.transport.yamux" + +var log = logger.NewNamed(CName) + +func New() Yamux { + return new(yamuxTransport) +} + +// Yamux implements transport.Transport with tcp+yamux +type Yamux interface { + transport.Transport + app.ComponentRunnable +} + +type yamuxTransport struct { + secure secureservice.SecureService + accepter transport.Accepter + conf Config + + listeners []net.Listener + listCtx context.Context + listCtxCancel context.CancelFunc + yamuxConf *yamux.Config +} + +func (y *yamuxTransport) Init(a *app.App) (err error) { + y.secure = a.MustComponent(secureservice.CName).(secureservice.SecureService) + y.conf = a.MustComponent("config").(configGetter).GetYamux() + y.yamuxConf = yamux.DefaultConfig() + if y.conf.MaxStreams > 0 { + y.yamuxConf.AcceptBacklog = y.conf.MaxStreams + } + y.yamuxConf.EnableKeepAlive = false + y.yamuxConf.StreamOpenTimeout = time.Duration(y.conf.DialTimeoutSec) * time.Second + y.yamuxConf.ConnectionWriteTimeout = time.Duration(y.conf.WriteTimeoutSec) * time.Second + return +} + +func (y *yamuxTransport) Name() string { + return CName +} + +func (y *yamuxTransport) Run(ctx context.Context) (err error) { + if y.accepter == nil { + return fmt.Errorf("can't run service without accepter") + } + for _, listAddr := range y.conf.ListenAddrs { + list, err := net.Listen("tcp", listAddr) + if err != nil { + return err + } + y.listeners = append(y.listeners, list) + } + y.listCtx, y.listCtxCancel = context.WithCancel(context.Background()) + for _, list := range y.listeners { + go y.acceptLoop(y.listCtx, list) + } + return +} + +func (y *yamuxTransport) SetAccepter(accepter transport.Accepter) { + y.accepter = accepter +} + +func (y *yamuxTransport) Dial(ctx context.Context, addr string) (mc transport.MultiConn, err error) { + dialTimeout := time.Duration(y.conf.DialTimeoutSec) * time.Second + conn, err := net.DialTimeout("tcp", addr, dialTimeout) + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() + cctx, sc, err := y.secure.SecureOutbound(ctx, conn) + if err != nil { + _ = conn.Close() + return nil, err + } + luc := connutil.NewLastUsageConn(sc) + sess, err := yamux.Client(luc, y.yamuxConf) + if err != nil { + return + } + mc = &yamuxConn{ + ctx: cctx, + luConn: luc, + Session: sess, + } + return +} + +func (y *yamuxTransport) acceptLoop(ctx context.Context, list net.Listener) { + l := log.With(zap.String("localAddr", list.Addr().String())) + l.Info("yamux listener started") + defer func() { + l.Debug("yamux listener stopped") + }() + for { + conn, err := list.Accept() + if err != nil { + if isTemporary(err) { + l.Debug("listener temporary accept error", zap.Error(err)) + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + continue + } + if err != net.ErrClosed { + l.Error("listener closed with error", zap.Error(err)) + } else { + l.Info("listener closed") + } + return + } + go y.accept(conn) + } +} + +func (y *yamuxTransport) accept(conn net.Conn) { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(y.conf.DialTimeoutSec)*time.Second) + defer cancel() + cctx, sc, err := y.secure.SecureInbound(ctx, conn) + if err != nil { + log.Warn("incoming connection handshake error", zap.Error(err)) + return + } + luc := connutil.NewLastUsageConn(sc) + sess, err := yamux.Server(luc, y.yamuxConf) + if err != nil { + log.Warn("incoming connection yamux session error", zap.Error(err)) + return + } + mc := &yamuxConn{ + ctx: cctx, + luConn: luc, + Session: sess, + } + if err = y.accepter.Accept(mc); err != nil { + log.Warn("connection accept error", zap.Error(err)) + } +} + +func (y *yamuxTransport) Close(ctx context.Context) (err error) { + if y.listCtxCancel != nil { + y.listCtxCancel() + } + for _, l := range y.listeners { + _ = l.Close() + } + return +} diff --git a/net/transport/yamux/yamux_test.go b/net/transport/yamux/yamux_test.go new file mode 100644 index 00000000..20efdfce --- /dev/null +++ b/net/transport/yamux/yamux_test.go @@ -0,0 +1,134 @@ +package yamux + +import ( + "bytes" + "context" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/net/secureservice" + "github.com/anyproto/any-sync/net/transport" + "github.com/anyproto/any-sync/nodeconf" + "github.com/anyproto/any-sync/nodeconf/mock_nodeconf" + "github.com/anyproto/any-sync/testutil/accounttest" + "github.com/anyproto/any-sync/testutil/testnodeconf" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "io" + "testing" +) + +var ctx = context.Background() + +func TestYamuxTransport_Dial(t *testing.T) { + fxS := newFixture(t) + defer fxS.finish(t) + fxC := newFixture(t) + defer fxC.finish(t) + + mcC, err := fxC.Dial(ctx, fxS.addr) + require.NoError(t, err) + require.Len(t, fxS.accepter.mcs, 1) + mcS := fxS.accepter.mcs[0] + + var ( + sData string + acceptErr error + copyErr error + done = make(chan struct{}) + ) + + go func() { + defer close(done) + conn, serr := mcS.Accept() + if serr != nil { + acceptErr = serr + return + } + buf := bytes.NewBuffer(nil) + _, copyErr = io.Copy(buf, conn) + sData = buf.String() + return + }() + + conn, err := mcC.Open(ctx) + require.NoError(t, err) + data := "some data" + _, err = conn.Write([]byte(data)) + require.NoError(t, err) + require.NoError(t, conn.Close()) + <-done + + assert.NoError(t, acceptErr) + assert.Equal(t, data, sData) + assert.NoError(t, copyErr) +} + +type fixture struct { + *yamuxTransport + a *app.App + ctrl *gomock.Controller + mockNodeConf *mock_nodeconf.MockService + acc *accounttest.AccountTestService + accepter *testAccepter + addr string +} + +func newFixture(t *testing.T) *fixture { + fx := &fixture{ + yamuxTransport: New().(*yamuxTransport), + ctrl: gomock.NewController(t), + acc: &accounttest.AccountTestService{}, + accepter: &testAccepter{}, + a: new(app.App), + } + + fx.mockNodeConf = mock_nodeconf.NewMockService(fx.ctrl) + fx.mockNodeConf.EXPECT().Init(gomock.Any()) + fx.mockNodeConf.EXPECT().Name().Return(nodeconf.CName).AnyTimes() + fx.mockNodeConf.EXPECT().Run(ctx) + fx.mockNodeConf.EXPECT().Close(ctx) + fx.mockNodeConf.EXPECT().NodeTypes(gomock.Any()).Return([]nodeconf.NodeType{nodeconf.NodeTypeTree}).AnyTimes() + fx.a.Register(fx.acc).Register(newTestConf()).Register(fx.mockNodeConf).Register(secureservice.New()).Register(fx.yamuxTransport).Register(fx.accepter) + require.NoError(t, fx.a.Start(ctx)) + fx.addr = fx.listeners[0].Addr().String() + return fx +} + +func (fx *fixture) finish(t *testing.T) { + require.NoError(t, fx.a.Close(ctx)) + fx.ctrl.Finish() +} + +func newTestConf() *testConf { + return &testConf{testnodeconf.GenNodeConfig(1)} +} + +type testConf struct { + *testnodeconf.Config +} + +func (c *testConf) GetYamux() Config { + return Config{ + ListenAddrs: []string{"127.0.0.1:0"}, + WriteTimeoutSec: 10, + DialTimeoutSec: 10, + MaxStreams: 1024, + } +} + +type testAccepter struct { + err error + mcs []transport.MultiConn +} + +func (t *testAccepter) Accept(mc transport.MultiConn) (err error) { + t.mcs = append(t.mcs, mc) + return t.err +} + +func (t *testAccepter) Init(a *app.App) (err error) { + a.MustComponent(CName).(transport.Transport).SetAccepter(t) + return nil +} + +func (t *testAccepter) Name() (name string) { return "testAccepter" }