Skip to content

Commit

Permalink
Merge pull request #97 from libp2p/feat/just-tcp-example
Browse files Browse the repository at this point in the history
Feat/just tcp example
  • Loading branch information
whyrusleeping authored Sep 4, 2016
2 parents 40a5c58 + 713d9a8 commit 0b0f6fd
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 37 deletions.
16 changes: 16 additions & 0 deletions examples/justtcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# libp2p 'just tcp' example

## What this does
This example starts up a libp2p swarm that listens for tcp connections behind a
multistream muxer protocol of `/plaintext/1.0.0`. All connections made to it
will be echoed back.

## Building
```
$ go build
```

## Usage
```
$ ./justtcp
```
85 changes: 85 additions & 0 deletions examples/justtcp/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package main

import (
"context"
"fmt"
"net"
"os"

transport "github.com/ipfs/go-libp2p-transport"
ma "github.com/jbenet/go-multiaddr"
smux "github.com/jbenet/go-stream-muxer"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
)

func fatal(i interface{}) {
fmt.Println(i)
os.Exit(1)
}

type NullMux struct{}

type NullMuxConn struct {
net.Conn
}

func (c *NullMuxConn) AcceptStream() (smux.Stream, error) {
panic("We don't do this")
}

func (c *NullMuxConn) IsClosed() bool {
return false
}

func (c *NullMuxConn) OpenStream() (smux.Stream, error) {
panic("if only you could see how disappointed i am in you right now")
}

func (c *NullMuxConn) Serve(_ smux.StreamHandler) {
}

func (nm NullMux) NewConn(c net.Conn, server bool) (smux.Conn, error) {
return &NullMuxConn{c}, nil
}

var _ smux.Transport = (*NullMux)(nil)

func main() {
laddr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/5555")
if err != nil {
fatal(err)
}

swarm.PSTransport = new(NullMux)

s := swarm.NewBlankSwarm(context.Background(), "bob", nil)

s.AddTransport(transport.NewTCPTransport())

err = s.AddListenAddr(laddr)
if err != nil {
fatal(err)
}

s.SetConnHandler(func(c *swarm.Conn) {
fmt.Println("CALLED OUR CONN HANDLER!")
defer c.Close()
buf := make([]byte, 1024)
for {
n, err := c.RawConn().Read(buf)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("read: %q\n", string(buf[:n]))

_, err = c.RawConn().Write(buf[:n])
if err != nil {
fmt.Println(err)
return
}
}
})

<-make(chan bool)
}
2 changes: 1 addition & 1 deletion p2p/net/conn/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
}

cryptoProtoChoice := SecioTag
if !EncryptConnections {
if !EncryptConnections || d.PrivateKey == nil {
cryptoProtoChoice = NoEncryptionTag
}

Expand Down
6 changes: 2 additions & 4 deletions p2p/net/conn/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func testDialerCloseEarly(t *testing.T, secure bool) {

// lol nesting
d2 := &Dialer{
LocalPeer: p2.ID,
// PrivateKey: key2, -- dont give it key. we'll just close the conn.
LocalPeer: p2.ID,
PrivateKey: p2.PrivKey, //-- dont give it key. we'll just close the conn.
}
d2.AddDialer(dialer(t, p2.Addr))

Expand Down Expand Up @@ -527,7 +527,6 @@ func TestConcurrentAccept(t *testing.T) {

err = grc.CheckForLeaks(goroFilter)
if err != nil {
panic(err)
t.Fatal(err)
}
}
Expand Down Expand Up @@ -644,7 +643,6 @@ func TestConnectionTimeouts(t *testing.T) {

err = grc.CheckForLeaks(goroFilter)
if err != nil {
panic(err)
t.Fatal(err)
}
}
2 changes: 1 addition & 1 deletion p2p/net/conn/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func WrapTransportListener(ctx context.Context, ml transport.Listener, local pee
return false
}

if EncryptConnections {
if EncryptConnections && sk != nil {
l.mux.AddHandler(SecioTag, nil)
} else {
l.mux.AddHandler(NoEncryptionTag, nil)
Expand Down
39 changes: 33 additions & 6 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"sync"
"time"

metrics "github.com/libp2p/go-libp2p/p2p/metrics"
mconn "github.com/libp2p/go-libp2p/p2p/metrics/conn"
inet "github.com/libp2p/go-libp2p/p2p/net"
conn "github.com/libp2p/go-libp2p/p2p/net/conn"
filter "github.com/libp2p/go-libp2p/p2p/net/filter"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"

ci "github.com/ipfs/go-libp2p-crypto"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
transport "github.com/ipfs/go-libp2p-transport"
Expand All @@ -19,12 +27,6 @@ import (
pst "github.com/jbenet/go-stream-muxer"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
metrics "github.com/libp2p/go-libp2p/p2p/metrics"
mconn "github.com/libp2p/go-libp2p/p2p/metrics/conn"
inet "github.com/libp2p/go-libp2p/p2p/net"
conn "github.com/libp2p/go-libp2p/p2p/net/conn"
filter "github.com/libp2p/go-libp2p/p2p/net/filter"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
psmss "github.com/whyrusleeping/go-smux-multistream"
spdy "github.com/whyrusleeping/go-smux-spdystream"
yamux "github.com/whyrusleeping/go-smux-yamux"
Expand Down Expand Up @@ -143,6 +145,31 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
return s, nil
}

func NewBlankSwarm(ctx context.Context, id peer.ID, privkey ci.PrivKey) *Swarm {
s := &Swarm{
swarm: ps.NewSwarm(PSTransport),
local: id,
peers: pstore.NewPeerstore(),
ctx: ctx,
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
fdRateLimit: make(chan struct{}, concurrentFdDials),
Filters: filter.NewFilters(),
dialer: conn.NewDialer(id, privkey, nil),
}

// configure Swarm
s.limiter = newDialLimiter(s.dialAddr)
s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown)
s.SetConnHandler(nil) // make sure to setup our own conn handler.

return s
}

func (s *Swarm) AddTransport(t transport.Transport) {
s.transports = append(s.transports, t)
}

func (s *Swarm) teardown() error {
return s.swarm.Close()
}
Expand Down
56 changes: 31 additions & 25 deletions p2p/net/swarm/swarm_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,50 @@ import (
context "golang.org/x/net/context"
)

func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
tpt := s.transportForAddr(a)
if tpt == nil {
return fmt.Errorf("no transport for address: %s", a)
}

d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts)
if err != nil {
return err
}

s.dialer.AddDialer(d)

list, err := tpt.Listen(a)
if err != nil {
return err
}

err = s.addListener(list)
if err != nil {
return err
}

return nil
}

// Open listeners and reuse-dialers for the given addresses
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
errs := make([]error, len(addrs))
var succeeded int
for i, a := range addrs {
tpt := s.transportForAddr(a)
if tpt == nil {
errs[i] = fmt.Errorf("no transport for address: %s", a)
continue
}

d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts)
if err != nil {
errs[i] = err
continue
}

s.dialer.AddDialer(d)

list, err := tpt.Listen(a)
if err != nil {
if err := s.AddListenAddr(a); err != nil {
errs[i] = err
continue
} else {
succeeded++
}

err = s.addListener(list)
if err != nil {
errs[i] = err
continue
}
succeeded++
}

for i, e := range errs {
if e != nil {
log.Warning("listen on %s failed: %s", addrs[i], errs[i])
}
}

if succeeded == 0 && len(addrs) > 0 {
return fmt.Errorf("failed to listen on any addresses: %s", errs)
}
Expand Down Expand Up @@ -83,7 +89,7 @@ func (s *Swarm) addListener(tptlist transport.Listener) error {

list.SetAddrFilters(s.Filters)

if cw, ok := list.(conn.ListenerConnWrapper); ok {
if cw, ok := list.(conn.ListenerConnWrapper); ok && s.bwc != nil {
cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
return mconn.WrapConn(s.bwc, c)
})
Expand Down

0 comments on commit 0b0f6fd

Please sign in to comment.