Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement utp transport #2050

Merged
merged 1 commit into from
Dec 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions p2p/net/swarm/addr/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ var log = logging.Logger("p2p/net/swarm/addr")
var SupportedTransportStrings = []string{
"/ip4/tcp",
"/ip6/tcp",
// "/ip4/udp/utp", disabled because the lib is broken
// "/ip6/udp/utp", disabled because the lib is broken
"/ip4/udp/utp",
"/ip6/udp/utp",
// "/ip4/udp/udt", disabled because the lib doesnt work on arm
// "/ip6/udp/udt", disabled because the lib doesnt work on arm
}
Expand Down
8 changes: 1 addition & 7 deletions p2p/net/swarm/addr/addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func TestFilterAddrs(t *testing.T) {
bad := []ma.Multiaddr{
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234"), // unreliable
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"), // utp is broken
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm
newMultiaddr(t, "/ip6/fe80::1/tcp/1234"), // link local
newMultiaddr(t, "/ip6/fe80::100/tcp/1234"), // link local
Expand All @@ -29,6 +28,7 @@ func TestFilterAddrs(t *testing.T) {
good := []ma.Multiaddr{
newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"),
newMultiaddr(t, "/ip6/::1/tcp/1234"),
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"),
}

goodAndBad := append(good, bad...)
Expand All @@ -39,18 +39,12 @@ func TestFilterAddrs(t *testing.T) {
if AddrUsable(a, false) {
t.Errorf("addr %s should be unusable", a)
}
if AddrUsable(a, true) {
t.Errorf("addr %s should be unusable", a)
}
}

for _, a := range good {
if !AddrUsable(a, false) {
t.Errorf("addr %s should be usable", a)
}
if !AddrUsable(a, true) {
t.Errorf("addr %s should be usable", a)
}
}

subtestAddrsEqual(t, FilterUsableAddrs(bad), []ma.Multiaddr{})
Expand Down
17 changes: 10 additions & 7 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,16 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
}

s := &Swarm{
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
ctx: ctx,
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
transports: []transport.Transport{transport.NewTCPTransport()},
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
ctx: ctx,
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
transports: []transport.Transport{
transport.NewTCPTransport(),
transport.NewUtpTransport(),
},
bwc: bwc,
fdRateLimit: make(chan struct{}, concurrentFdDials),
Filters: filter.NewFilters(),
Expand Down
6 changes: 3 additions & 3 deletions p2p/net/swarm/swarm_addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func TestFilterAddrs(t *testing.T) {
bad := []ma.Multiaddr{
m("/ip4/1.2.3.4/udp/1234"), // unreliable
m("/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet
m("/ip4/1.2.3.4/udp/1234/utp"), // utp is broken
m("/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm
m("/ip6/fe80::1/tcp/0"), // link local
m("/ip6/fe80::100/tcp/1234"), // link local
Expand All @@ -34,20 +33,21 @@ func TestFilterAddrs(t *testing.T) {
good := []ma.Multiaddr{
m("/ip4/127.0.0.1/tcp/0"),
m("/ip6/::1/tcp/0"),
m("/ip4/1.2.3.4/udp/1234/utp"),
}

goodAndBad := append(good, bad...)

// test filters

for _, a := range bad {
if addrutil.AddrUsable(a, true) {
if addrutil.AddrUsable(a, false) {
t.Errorf("addr %s should be unusable", a)
}
}

for _, a := range good {
if !addrutil.AddrUsable(a, true) {
if !addrutil.AddrUsable(a, false) {
t.Errorf("addr %s should be usable", a)
}
}
Expand Down
148 changes: 148 additions & 0 deletions p2p/net/transport/utp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package transport
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought libp2p got moved out + vendored? i guess not yet?


import (
"net"
"sync"

utp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/anacrolix/utp"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
mautp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/utp"
)

type UtpTransport struct {
sockLock sync.Mutex
sockets map[string]*UtpSocket
}

func NewUtpTransport() *UtpTransport {
return &UtpTransport{
sockets: make(map[string]*UtpSocket),
}
}

func (d *UtpTransport) Matches(a ma.Multiaddr) bool {
p := a.Protocols()
return len(p) == 3 && p[2].Name == "utp"
}

type UtpSocket struct {
s *utp.Socket
laddr ma.Multiaddr
transport Transport
}

func (t *UtpTransport) Listen(laddr ma.Multiaddr) (Listener, error) {
t.sockLock.Lock()
defer t.sockLock.Unlock()
s, ok := t.sockets[laddr.String()]
if ok {
return s, nil
}

ns, err := t.newConn(laddr)
if err != nil {
return nil, err
}

t.sockets[laddr.String()] = ns
return ns, nil
}

func (t *UtpTransport) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error) {
t.sockLock.Lock()
defer t.sockLock.Unlock()
s, ok := t.sockets[laddr.String()]
if ok {
return s, nil
}

ns, err := t.newConn(laddr, opts...)
if err != nil {
return nil, err
}

t.sockets[laddr.String()] = ns
return ns, nil
}

func (t *UtpTransport) newConn(addr ma.Multiaddr, opts ...DialOpt) (*UtpSocket, error) {
network, netaddr, err := manet.DialArgs(addr)
if err != nil {
return nil, err
}

s, err := utp.NewSocket("udp"+network[3:], netaddr)
if err != nil {
return nil, err
}

laddr, err := manet.FromNetAddr(mautp.MakeAddr(s.LocalAddr()))
if err != nil {
return nil, err
}

return &UtpSocket{
s: s,
laddr: laddr,
transport: t,
}, nil
}

func (s *UtpSocket) Dial(raddr ma.Multiaddr) (Conn, error) {
_, addr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}

con, err := s.s.Dial(addr)
if err != nil {
return nil, err
}

mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: con})
if err != nil {
return nil, err
}

return &connWrap{
Conn: mnc,
transport: s.transport,
}, nil
}

func (s *UtpSocket) Accept() (Conn, error) {
c, err := s.s.Accept()
if err != nil {
return nil, err
}

mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: c})
if err != nil {
return nil, err
}

return &connWrap{
Conn: mnc,
transport: s.transport,
}, nil
}

func (s *UtpSocket) Matches(a ma.Multiaddr) bool {
p := a.Protocols()
return len(p) == 3 && p[2].Name == "utp"
}

func (t *UtpSocket) Close() error {
return t.s.Close()
}

func (t *UtpSocket) Addr() net.Addr {
return t.s.Addr()
}

func (t *UtpSocket) Multiaddr() ma.Multiaddr {
return t.laddr
}

var _ Transport = (*UtpTransport)(nil)
6 changes: 6 additions & 0 deletions test/sharness/t0130-multinode.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,10 @@ test_expect_success "set up tcp testbed" '

run_basic_test

test_expect_success "set up utp testbed" '
iptb init -n 5 -p 0 -f --bootstrap=none --utp
'

run_basic_test

test_done