Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move go-reuseport-transport here #1459

Merged
merged 37 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4a9cd1b
initial commit
Stebalien Jan 19, 2018
ad8c7ee
add some documentation
Stebalien Jan 19, 2018
154ae5c
add minimal test
Stebalien Jan 19, 2018
fc057b0
handle all possible port configurations when creating dialers
Stebalien Jan 22, 2018
1ad436a
fix incorrect condition in reuseDial
Stebalien Jan 22, 2018
a88cd13
unexport ReuseErrShouldRetry
Stebalien Mar 10, 2018
1af0f42
document source port choosing algorithm
Stebalien Mar 27, 2018
0b6d56b
define a global fallbackDialer
Stebalien Mar 27, 2018
2f5a5fe
add another test case
Stebalien Mar 29, 2018
95f137a
reset the dialer (which ports to dial from) when closing a listener
Stebalien Mar 29, 2018
bf8af04
add a test for dialing with a local and an unspecified listener
Stebalien Mar 29, 2018
9eafb98
test dialing with no listeners
Stebalien Mar 29, 2018
bd345d7
fix use of global address condition
Stebalien Mar 29, 2018
910deae
more tests
Stebalien Mar 29, 2018
33bd60a
test ipv6
Stebalien Mar 29, 2018
440615f
Use new go-reuseport.Control
anacrolix Dec 31, 2018
8b89db3
Add SetLinger(0) in reuseDial
anacrolix Jan 2, 2019
c46c576
Merge pull request #6 from libp2p/reuseport-control
Stebalien Jan 4, 2019
5abb5b9
don't set linger to 0
Stebalien Apr 4, 2019
cf38e56
Merge pull request #14 from libp2p/fix/linger
Stebalien Apr 4, 2019
ba298b7
fix: less confusing log message
Stebalien Feb 21, 2020
0c84888
Use Netroute (#25)
willscott Apr 2, 2020
90e1f36
Fix build on Plan 9
fhs Jul 10, 2020
2a09bb7
Update go-netroute and go-reuseport for Plan 9 support
fhs Jul 11, 2020
b304d8d
stop using the deprecated go-multiaddr-net package
marten-seemann Jul 2, 2021
180f258
Merge pull request #30 from libp2p/remove-go-multiaddr-net
marten-seemann Jul 2, 2021
1503736
run gofmt -s
web3-bot Aug 17, 2021
c12d890
disable failing tests on OSX
marten-seemann Sep 20, 2021
386648e
Merge pull request #31 from libp2p/web3-bot/sync
marten-seemann Sep 20, 2021
3f74a3a
chore: update go-log to v2
marten-seemann Sep 20, 2021
ef6c8c9
Merge pull request #35 from libp2p/update-go-log
marten-seemann Sep 20, 2021
c42772b
disable failing TestV6V4 on OSX (#41)
marten-seemann May 18, 2022
2f38a87
set linger to 0 on OSX in tests (#42)
marten-seemann May 19, 2022
27b9721
improve package-level documentation (#43)
marten-seemann May 20, 2022
def3c71
move go-reuseport-transport here
marten-seemann May 20, 2022
3b98536
switch from github.com/libp2p/go-reuseport-transport to p2p/net/reuse…
marten-seemann May 20, 2022
492ee92
rename the reuseport package to reuseport
marten-seemann May 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ require (
github.com/libp2p/go-msgio v0.2.0
github.com/libp2p/go-netroute v0.2.0
github.com/libp2p/go-reuseport v0.1.0
github.com/libp2p/go-reuseport-transport v0.1.0
github.com/libp2p/go-stream-muxer-multistream v0.4.0
github.com/libp2p/go-yamux/v3 v3.1.1
github.com/libp2p/zeroconf/v2 v2.1.1
Expand Down
113 changes: 113 additions & 0 deletions p2p/net/reuseport/dial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package reuseport

import (
"context"
"net"

"github.com/libp2p/go-reuseport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

type dialer interface {
Dial(network, addr string) (net.Conn, error)
DialContext(ctx context.Context, network, addr string) (net.Conn, error)
}

// Dial dials the given multiaddr, reusing ports we're currently listening on if
// possible.
//
// Dial attempts to be smart about choosing the source port. For example, If
// we're dialing a loopback address and we're listening on one or more loopback
// ports, Dial will randomly choose one of the loopback ports and addresses and
// reuse it.
func (t *Transport) Dial(raddr ma.Multiaddr) (manet.Conn, error) {
return t.DialContext(context.Background(), raddr)
}

// DialContext is like Dial but takes a context.
func (t *Transport) DialContext(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
network, addr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}
var d dialer
switch network {
case "tcp4":
d = t.v4.getDialer(network)
case "tcp6":
d = t.v6.getDialer(network)
default:
return nil, ErrWrongProto
}
conn, err := d.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
maconn, err := manet.WrapNetConn(conn)
if err != nil {
conn.Close()
return nil, err
}
return maconn, nil
}

func (n *network) getDialer(network string) dialer {
n.mu.RLock()
d := n.dialer
n.mu.RUnlock()
if d == nil {
n.mu.Lock()
defer n.mu.Unlock()

if n.dialer == nil {
n.dialer = n.makeDialer(network)
}
d = n.dialer
}
return d
}

func (n *network) makeDialer(network string) dialer {
if !reuseport.Available() {
log.Debug("reuseport not available")
return &net.Dialer{}
}

var unspec net.IP
switch network {
case "tcp4":
unspec = net.IPv4zero
case "tcp6":
unspec = net.IPv6unspecified
default:
panic("invalid network: must be either tcp4 or tcp6")
}

// How many ports are we listening on.
var port = 0
for l := range n.listeners {
newPort := l.Addr().(*net.TCPAddr).Port
switch {
case newPort == 0: // Any port, ignore (really, we shouldn't get this case...).
case port == 0: // Haven't selected a port yet, choose this one.
port = newPort
case newPort == port: // Same as the selected port, continue...
default: // Multiple ports, use the multi dialer
return newMultiDialer(unspec, n.listeners)
}
}

// None.
if port == 0 {
return &net.Dialer{}
}

// One. Always dial from the single port we're listening on.
laddr := &net.TCPAddr{
IP: unspec,
Port: port,
}

return (*singleDialer)(laddr)
}
80 changes: 80 additions & 0 deletions p2p/net/reuseport/listen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package reuseport

import (
"net"

"github.com/libp2p/go-reuseport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

type listener struct {
manet.Listener
network *network
}

func (l *listener) Close() error {
l.network.mu.Lock()
delete(l.network.listeners, l)
l.network.dialer = nil
l.network.mu.Unlock()
return l.Listener.Close()
}

// Listen listens on the given multiaddr.
//
// If reuseport is supported, it will be enabled for this listener and future
// dials from this transport may reuse the port.
//
// Note: You can listen on the same multiaddr as many times as you want
// (although only *one* listener will end up handling the inbound connection).
func (t *Transport) Listen(laddr ma.Multiaddr) (manet.Listener, error) {
nw, naddr, err := manet.DialArgs(laddr)
if err != nil {
return nil, err
}
var n *network
switch nw {
case "tcp4":
n = &t.v4
case "tcp6":
n = &t.v6
default:
return nil, ErrWrongProto
}

if !reuseport.Available() {
return manet.Listen(laddr)
}
nl, err := reuseport.Listen(nw, naddr)
if err != nil {
return manet.Listen(laddr)
}

if _, ok := nl.Addr().(*net.TCPAddr); !ok {
nl.Close()
return nil, ErrWrongProto
}

malist, err := manet.WrapNetListener(nl)
if err != nil {
nl.Close()
return nil, err
}

list := &listener{
Listener: malist,
network: n,
}

n.mu.Lock()
defer n.mu.Unlock()

if n.listeners == nil {
n.listeners = make(map[*listener]struct{})
}
n.listeners[list] = struct{}{}
n.dialer = nil

return list, nil
}
90 changes: 90 additions & 0 deletions p2p/net/reuseport/multidialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package reuseport

import (
"context"
"fmt"
"math/rand"
"net"

"github.com/libp2p/go-netroute"
)

type multiDialer struct {
listeningAddresses []*net.TCPAddr
loopback []*net.TCPAddr
unspecified []*net.TCPAddr
fallback net.TCPAddr
}

func (d *multiDialer) Dial(network, addr string) (net.Conn, error) {
return d.DialContext(context.Background(), network, addr)
}

func randAddr(addrs []*net.TCPAddr) *net.TCPAddr {
if len(addrs) > 0 {
return addrs[rand.Intn(len(addrs))]
}
return nil
}

// DialContext dials a target addr.
// Dialing preference is
// * If there is a listener on the local interface the OS expects to use to route towards addr, use that.
// * If there is a listener on a loopback address, addr is loopback, use that.
// * If there is a listener on an undefined address (0.0.0.0 or ::), use that.
// * Use the fallback IP specified during construction, with a port that's already being listened on, if one exists.
func (d *multiDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
tcpAddr, err := net.ResolveTCPAddr(network, addr)
if err != nil {
return nil, err
}
ip := tcpAddr.IP
if !ip.IsLoopback() && !ip.IsGlobalUnicast() {
return nil, fmt.Errorf("undialable IP: %s", ip)
}

if router, err := netroute.New(); err == nil {
if _, _, preferredSrc, err := router.Route(ip); err == nil {
for _, optAddr := range d.listeningAddresses {
if optAddr.IP.Equal(preferredSrc) {
return reuseDial(ctx, optAddr, network, addr)
}
}
}
}

if ip.IsLoopback() && len(d.loopback) > 0 {
return reuseDial(ctx, randAddr(d.loopback), network, addr)
}
if len(d.unspecified) == 0 {
return reuseDial(ctx, &d.fallback, network, addr)
}

return reuseDial(ctx, randAddr(d.unspecified), network, addr)
}

func newMultiDialer(unspec net.IP, listeners map[*listener]struct{}) (m dialer) {
addrs := make([]*net.TCPAddr, 0)
loopback := make([]*net.TCPAddr, 0)
unspecified := make([]*net.TCPAddr, 0)
existingPort := 0

for l := range listeners {
addr := l.Addr().(*net.TCPAddr)
addrs = append(addrs, addr)
if addr.IP.IsLoopback() {
loopback = append(loopback, addr)
} else if addr.IP.IsGlobalUnicast() && existingPort == 0 {
existingPort = addr.Port
} else if addr.IP.IsUnspecified() {
unspecified = append(unspecified, addr)
}
}
m = &multiDialer{
listeningAddresses: addrs,
loopback: loopback,
unspecified: unspecified,
fallback: net.TCPAddr{IP: unspec, Port: existingPort},
}
return
}
35 changes: 35 additions & 0 deletions p2p/net/reuseport/reuseport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package reuseport

import (
"context"
"net"

"github.com/libp2p/go-reuseport"
)

var fallbackDialer net.Dialer

// Dials using reuseport and then redials normally if that fails.
func reuseDial(ctx context.Context, laddr *net.TCPAddr, network, raddr string) (con net.Conn, err error) {
if laddr == nil {
return fallbackDialer.DialContext(ctx, network, raddr)
}

d := net.Dialer{
LocalAddr: laddr,
Control: reuseport.Control,
}

con, err = d.DialContext(ctx, network, raddr)
if err == nil {
return con, nil
}

if reuseErrShouldRetry(err) && ctx.Err() == nil {
// We could have an existing socket open or we could have one
// stuck in TIME-WAIT.
log.Debugf("failed to reuse port, will try again with a random port: %s", err)
con, err = fallbackDialer.DialContext(ctx, network, raddr)
}
return con, err
}
44 changes: 44 additions & 0 deletions p2p/net/reuseport/reuseport_plan9.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package reuseport

import (
"net"
"os"
)

const (
EADDRINUSE = "address in use"
ECONNREFUSED = "connection refused"
)

// reuseErrShouldRetry diagnoses whether to retry after a reuse error.
// if we failed to bind, we should retry. if bind worked and this is a
// real dial error (remote end didnt answer) then we should not retry.
func reuseErrShouldRetry(err error) bool {
if err == nil {
return false // hey, it worked! no need to retry.
}

// if it's a network timeout error, it's a legitimate failure.
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
return false
}

e, ok := err.(*net.OpError)
if !ok {
return true
}

e1, ok := e.Err.(*os.PathError)
if !ok {
return true
}

switch e1.Err.Error() {
case EADDRINUSE:
return true
case ECONNREFUSED:
return false
default:
return true // optimistically default to retry.
}
}
Loading