Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/utp #1789

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
implement basic utp dialing and listening support
License: MIT
Signed-off-by: Jeromy <[email protected]>
whyrusleeping committed Oct 28, 2015

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 9b76adc1fe862b6385a5691031fd09d39d3f7cff
2 changes: 1 addition & 1 deletion Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 35 additions & 70 deletions p2p/net/conn/dial.go
Original file line number Diff line number Diff line change
@@ -6,20 +6,35 @@ import (
"net"
"strings"
"syscall"
"time"

ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"

ci "github.com/ipfs/go-ipfs/p2p/crypto"
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
peer "github.com/ipfs/go-ipfs/p2p/peer"
)

type WrapFunc func(manet.Conn) manet.Conn

func NewDialer(p peer.ID, pk ci.PrivKey, tout time.Duration, wrap WrapFunc) *Dialer {
var manetd manet.Dialer
manetd.Timeout = tout // timeout is a nested field of net.Dialer

return &Dialer{
LocalPeer: p,
PrivateKey: pk,
Wrapper: wrap,
fallbackDialer: &BasicMaDialer{Dialer: manetd},
}
}

// String returns the string rep of d.
func (d *Dialer) String() string {
return fmt.Sprintf("<Dialer %s %s ...>", d.LocalPeer, d.LocalAddrs[0])
return fmt.Sprintf("<Dialer %s ...>", d.LocalPeer)
}

// Dial connects to a peer over a particular address
@@ -95,84 +110,34 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
return connOut, nil
}

// rawConnDial dials the underlying net.Conn + manet.Conns
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) {

// before doing anything, check we're going to be able to dial.
// we may not support the given address.
if _, _, err := manet.DialArgs(raddr); err != nil {
return nil, err
}

if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr)
}
func (d *Dialer) AddDialer(pd ProtoDialer) {
d.Dialers = append(d.Dialers, pd)
}

// get local addr to use.
laddr := pickLocalAddr(d.LocalAddrs, raddr)
logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr)
defer log.EventBegin(ctx, "connDialRawConn", logdial).Done()

// make a copy of the manet.Dialer, we may need to change its timeout.
madialer := d.Dialer

if laddr != nil && reuseportIsAvailable() {
// we're perhaps going to dial twice. half the timeout, so we can afford to.
// otherwise our context would expire right after the first dial.
madialer.Dialer.Timeout = (madialer.Dialer.Timeout / 2)

// dial using reuseport.Dialer, because we're probably reusing addrs.
// this is optimistic, as the reuseDial may fail to bind the port.
rpev := log.EventBegin(ctx, "connDialReusePort", logdial)
if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil {
// if it worked, wrap the raw net.Conn with our manet.Conn
logdial["reuseport"] = "success"
rpev.Done()
return manet.WrapNetConn(nconn)
} else if !retry {
// reuseDial is sure this is a legitimate dial failure, not a reuseport failure.
logdial["reuseport"] = "failure"
logdial["error"] = reuseErr
rpev.Done()
return nil, reuseErr
} else {
// this is a failure to reuse port. log it.
logdial["reuseport"] = "retry"
logdial["error"] = reuseErr
rpev.Done()
// returns dialer that can dial the given address
func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) ProtoDialer {
for _, pd := range d.Dialers {
if pd.Matches(raddr) {
return pd
}
}

defer log.EventBegin(ctx, "connDialManet", logdial).Done()
return madialer.Dial(raddr)
return d.fallbackDialer
}

func reuseDial(dialer net.Dialer, laddr, raddr ma.Multiaddr) (conn net.Conn, retry bool, err error) {
if laddr == nil {
// if we're given no local address no sense in using reuseport to dial, dial out as usual.
return nil, true, reuseport.ErrReuseFailed
}

// give reuse.Dialer the manet.Dialer's Dialer.
// (wow, Dialer should've so been an interface...)
rd := reuseport.Dialer{dialer}

// get the local net.Addr manually
rd.D.LocalAddr, err = manet.ToNetAddr(laddr)
if err != nil {
return nil, true, err // something wrong with laddr. retry without.
// rawConnDial dials the underlying net.Conn + manet.Conns
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) {
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr)
}

// get the raddr dial args for rd.dial
network, netraddr, err := manet.DialArgs(raddr)
if err != nil {
return nil, true, err // something wrong with laddr. retry without.
sd := d.subDialerForAddr(raddr)
if sd == nil {
return nil, fmt.Errorf("no dialer for %s", raddr)
}

// rd.Dial gets us a net.Conn with SO_REUSEPORT and SO_REUSEADDR set.
conn, err = rd.Dial(network, netraddr)
return conn, reuseErrShouldRetry(err), err // hey! it worked!
return sd.Dial(raddr)
}

// reuseErrShouldRetry diagnoses whether to retry after a reuse error.
5 changes: 4 additions & 1 deletion p2p/net/conn/dial_test.go
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p
LocalPeer: p2.ID,
PrivateKey: key2,
}
d2.AddDialer(new(BasicMaDialer))

var c2 Conn

@@ -152,6 +153,7 @@ func testDialer(t *testing.T, secure bool) {
LocalPeer: p2.ID,
PrivateKey: key2,
}
d2.AddDialer(new(BasicMaDialer))

go echoListen(ctx, l1)

@@ -227,6 +229,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
LocalPeer: p2.ID,
// PrivateKey: key2, -- dont give it key. we'll just close the conn.
}
d2.AddDialer(new(BasicMaDialer))

errs := make(chan error, 100)
done := make(chan struct{}, 1)
@@ -253,7 +256,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {

c, err := d2.Dial(ctx, p1.Addr, p1.ID)
if err != nil {
errs <- err
t.Fatal(err)
}
c.Close() // close it early.

13 changes: 8 additions & 5 deletions p2p/net/conn/interface.go
Original file line number Diff line number Diff line change
@@ -54,15 +54,18 @@ type Conn interface {
// Dial function as before, but it would have many arguments, as dialing is
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
type Dialer struct {

// Dialer is an optional manet.Dialer to use.
Dialer manet.Dialer

// LocalPeer is the identity of the local Peer.
LocalPeer peer.ID

// LocalAddrs is a set of local addresses to use.
LocalAddrs []ma.Multiaddr
//LocalAddrs []ma.Multiaddr

// Dialers are the sub-dialers usable by this dialer
// selected in order based on the address being dialed
Dialers []ProtoDialer

// fallback will be tried if no other transport can dial a given address
fallbackDialer ProtoDialer

// PrivateKey used to initialize a secure connection.
// Warning: if PrivateKey is nil, connection will not be secured.
4 changes: 4 additions & 0 deletions p2p/net/conn/listen.go
Original file line number Diff line number Diff line change
@@ -154,6 +154,10 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey
return nil, err
}

return WrapManetListener(ctx, ml, local, sk)
}

func WrapManetListener(ctx context.Context, ml manet.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) {
l := &listener{
Listener: ml,
local: local,
Loading