Skip to content

Commit

Permalink
refactor of swarm dialing, getting closer...
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Oct 4, 2015
1 parent 2538dc1 commit af3f628
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 33 deletions.
1 change: 0 additions & 1 deletion p2p/net/conn/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee
rpev.Done()
}
}
madialer.LocalAddr = nil

defer log.EventBegin(ctx, "connDialManet", logdial).Done()
return madialer.Dial(raddr)
Expand Down
1 change: 1 addition & 0 deletions p2p/net/swarm/addr/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func AddrUsable(a ma.Multiaddr, partial bool) bool {
return false
}
}

return true
}

Expand Down
6 changes: 5 additions & 1 deletion p2p/net/swarm/addr/addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func TestFilterAddrs(t *testing.T) {
bad := []ma.Multiaddr{
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234"), // unreliable
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"), // utp is broken
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm
newMultiaddr(t, "/ip6/fe80::1/tcp/1234"), // link local
newMultiaddr(t, "/ip6/fe80::100/tcp/1234"), // link local
Expand All @@ -29,6 +28,7 @@ func TestFilterAddrs(t *testing.T) {
good := []ma.Multiaddr{
newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"),
newMultiaddr(t, "/ip6/::1/tcp/1234"),
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"),
}

goodAndBad := append(good, bad...)
Expand All @@ -39,9 +39,13 @@ func TestFilterAddrs(t *testing.T) {
if AddrUsable(a, false) {
t.Errorf("addr %s should be unusable", a)
}
/* This doesnt make sense anymore
With utp allowed, udp gets 'allowed' as a partial address.
if AddrUsable(a, true) {
t.Errorf("addr %s should be unusable", a)
}
*/
}

for _, a := range good {
Expand Down
22 changes: 22 additions & 0 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ package swarm

import (
"fmt"
"net"
"sync"
"time"

metrics "github.com/ipfs/go-ipfs/metrics"
mconn "github.com/ipfs/go-ipfs/metrics/conn"
inet "github.com/ipfs/go-ipfs/p2p/net"
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
peer "github.com/ipfs/go-ipfs/p2p/peer"
logging "github.com/ipfs/go-ipfs/vendor/QmXJkcEXB6C9h6Ytb6rrUTFU56Ro62zxgrbxTT3dgjQGA8/go-log"

ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux"
Expand Down Expand Up @@ -58,6 +62,8 @@ type Swarm struct {
backf dialbackoff
dialT time.Duration // mainly for tests

dialer *conn.Dialer

notifmu sync.RWMutex
notifs map[inet.Notifiee]ps.Notifiee

Expand All @@ -78,11 +84,27 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
return nil, err
}

// open connection to peer
d := &conn.Dialer{
Dialer: manet.Dialer{
Dialer: net.Dialer{
Timeout: DialTimeout,
},
},
LocalPeer: local,
LocalAddrs: listenAddrs,
PrivateKey: peers.PrivKey(local),
Wrapper: func(c manet.Conn) manet.Conn {
return mconn.WrapConn(bwc, c)
},
}

s := &Swarm{
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
ctx: ctx,
dialer: d,
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
bwc: bwc,
Expand Down
36 changes: 5 additions & 31 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@ import (
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"

mconn "github.com/ipfs/go-ipfs/metrics/conn"
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
peer "github.com/ipfs/go-ipfs/p2p/peer"
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"

ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
processctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
Expand Down Expand Up @@ -287,14 +284,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
}

// get our own addrs. try dialing out from our listener addresses (reusing ports)
// Note that using our peerstore's addresses here is incorrect, as that would
// include observed addresses. TODO: make peerstore's address book smarter.
localAddrs := s.ListenAddresses()
if len(localAddrs) == 0 {
log.Debug("Dialing out with no local addresses.")
}

// get remote peer addrs
remoteAddrs := s.peers.Addrs(p)
// make sure we can use the addresses.
Expand All @@ -319,23 +308,8 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, err
}

// open connection to peer
d := &conn.Dialer{
Dialer: manet.Dialer{
Dialer: net.Dialer{
Timeout: s.dialT,
},
},
LocalPeer: s.local,
LocalAddrs: localAddrs,
PrivateKey: sk,
Wrapper: func(c manet.Conn) manet.Conn {
return mconn.WrapConn(s.bwc, c)
},
}

// try to get a connection to any addr
connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
connC, err := s.dialAddrs(ctx, p, remoteAddrs)
if err != nil {
logdial["error"] = err
return nil, err
Expand All @@ -355,7 +329,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return swarmC, nil
}

func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {

// try to connect to one of the peer's known addresses.
// we dial concurrently to each of the addresses, which:
Expand All @@ -373,7 +347,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote

// dialSingleAddr is used in the rate-limited async thing below.
dialSingleAddr := func(addr ma.Multiaddr) {
connC, err := s.dialAddr(ctx, d, p, addr)
connC, err := s.dialAddr(ctx, p, addr)

// check parent still wants our results
select {
Expand Down Expand Up @@ -435,10 +409,10 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
return nil, exitErr
}

func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
log.Debugf("%s swarm dialing %s %s", s.local, p, addr)

connC, err := d.Dial(ctx, addr, p)
connC, err := s.dialer.Dial(ctx, addr, p)
if err != nil {
return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err)
}
Expand Down

0 comments on commit af3f628

Please sign in to comment.