Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Self dialing #638

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
relay "github.com/libp2p/go-libp2p/p2p/host/relay"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
pipetransport "github.com/libp2p/go-libp2p/p2p/transport/pipe"

circuit "github.com/libp2p/go-libp2p-circuit"
discovery "github.com/libp2p/go-libp2p-discovery"
Expand Down Expand Up @@ -66,6 +67,7 @@ type Config struct {
Reporter metrics.Reporter

DisablePing bool
SelfDial bool

Routing RoutingC

Expand Down Expand Up @@ -110,7 +112,7 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
}

// TODO: Make the swarm implementation configurable.
swrm := swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter)
swrm := swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.SelfDial)
if cfg.Filters != nil {
swrm.Filters = cfg.Filters
}
Expand Down Expand Up @@ -162,6 +164,10 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
h.Close()
return nil, err
}
if cfg.SelfDial {
tpt := pipetransport.New(pid, cfg.PeerKey.GetPublic(), cfg.PeerKey)
tpts = append(tpts, tpt)
}
for _, t := range tpts {
err = swrm.AddTransport(t)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,22 @@ require (
github.com/libp2p/go-libp2p-blankhost v0.1.1
github.com/libp2p/go-libp2p-circuit v0.1.0
github.com/libp2p/go-libp2p-core v0.0.1
github.com/libp2p/go-libp2p-crypto v0.1.0
github.com/libp2p/go-libp2p-discovery v0.1.0
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-mplex v0.2.1
github.com/libp2p/go-libp2p-nat v0.0.4
github.com/libp2p/go-libp2p-netutil v0.1.0
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.1.0
github.com/libp2p/go-libp2p-secio v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
github.com/libp2p/go-libp2p-testing v0.0.3
github.com/libp2p/go-libp2p-transport v0.1.0
github.com/libp2p/go-libp2p-transport-upgrader v0.1.1
github.com/libp2p/go-libp2p-yamux v0.2.0
github.com/libp2p/go-maddr-filter v0.0.4
github.com/libp2p/go-stream-muxer v0.1.0
github.com/libp2p/go-stream-muxer-multistream v0.2.0
github.com/libp2p/go-tcp-transport v0.1.0
github.com/libp2p/go-ws-transport v0.1.0
Expand All @@ -35,3 +39,7 @@ require (
github.com/multiformats/go-multistream v0.1.0
github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30
)

replace github.com/libp2p/go-libp2p-swarm => ../go-libp2p-swarm

replace github.com/multiformats/go-multiaddr => ../../workspace-multiformats/go-multiaddr
Comment on lines +43 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing else to say :)

12 changes: 5 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpo
github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY=
github.com/libp2p/go-libp2p-secio v0.1.0 h1:NNP5KLxuP97sE5Bu3iuwOWyT/dKEGMN5zSLMWdB7GTQ=
github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8=
github.com/libp2p/go-libp2p-swarm v0.1.0 h1:HrFk2p0awrGEgch9JXK/qp/hfjqQfgNxpLWnCiWPg5s=
github.com/libp2p/go-libp2p-swarm v0.1.0/go.mod h1:wQVsCdjsuZoc730CgOvh5ox6K8evllckjebkdiY5ta4=
github.com/libp2p/go-libp2p-testing v0.0.1/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.3 h1:bdij4bKaaND7tCsaXVjRfYkMpvoOeKj9AVQGJllA6jM=
github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-transport v0.1.0 h1:q68SOTvX+71mk+n5eE3+FnUEPY5UL1CSFImH0bq0Vg8=
github.com/libp2p/go-libp2p-transport v0.1.0/go.mod h1:iL3c2tV3OVldqSwJrds8pmIWf4t/TwiF+eI/mhw/jjQ=
github.com/libp2p/go-libp2p-transport-upgrader v0.1.1 h1:PZMS9lhjK9VytzMCW3tWHAXtKXmlURSc3ZdvwEcKCzw=
github.com/libp2p/go-libp2p-transport-upgrader v0.1.1/go.mod h1:IEtA6or8JUbsV07qPW4r01GnTenLW4oi3lOPbUMGJJA=
github.com/libp2p/go-libp2p-yamux v0.2.0 h1:TSPZ5cMMz/wdoYsye/wU1TE4G3LDGMoeEN0xgnCKU/I=
Expand All @@ -137,6 +138,8 @@ github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQza
github.com/libp2p/go-reuseport-transport v0.0.2 h1:WglMwyXyBu61CMkjCCtnmqNqnjib0GIEjMiHTwR/KN4=
github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA20obEtsW9BLkuOUyQAOCbs=
github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14=
github.com/libp2p/go-stream-muxer v0.1.0 h1:3ToDXUzx8pDC6RfuOzGsUYP5roMDthbUKRdMRRhqAqY=
github.com/libp2p/go-stream-muxer v0.1.0/go.mod h1:8JAVsjeRBCWwPoZeH0W1imLOcriqXJyFvB0mR4A04sQ=
github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROmAFwUHpeRidG+q7LTQOg=
github.com/libp2p/go-stream-muxer-multistream v0.2.0/go.mod h1:j9eyPol/LLRqT+GPLSxvimPhNph4sfYfMoDPd7HkzIc=
github.com/libp2p/go-tcp-transport v0.1.0 h1:IGhowvEqyMFknOar4FWCKSWE0zL36UFKQtiRQD60/8o=
Expand Down Expand Up @@ -165,11 +168,6 @@ github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78=
github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.1 h1:/QUV3VBMDI6pi6xfiw7lr6xhDWWvQKn9udPn68kLSdY=
github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.4 h1:WgMSI84/eRLdbptXMkMWDXPjPq7SPLIgGUVm2eroyU4=
github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q=
github.com/multiformats/go-multiaddr-dns v0.0.2 h1:/Bbsgsy3R6e3jf2qBahzNHzww6usYaZ0NhNH3sqdFS8=
github.com/multiformats/go-multiaddr-dns v0.0.2/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q=
Expand Down
115 changes: 115 additions & 0 deletions libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (
"regexp"
"strings"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-tcp-transport"
ma "github.com/multiformats/go-multiaddr"
)

func TestNewHost(t *testing.T) {
Expand Down Expand Up @@ -77,6 +81,117 @@ func TestInsecure(t *testing.T) {
h.Close()
}

func TestPipeTransport(t *testing.T) {
ctx := context.Background()
memAddr, err := ma.NewMultiaddr("/memory/1")
if err != nil {
t.Fatal(err)
}
h, err := New(ctx, NoSecurity, EnableSelfDial, ListenAddrs(memAddr))
if err != nil {
t.Fatal(err)
}
info := peer.AddrInfo{
ID: h.ID(),
Addrs: []ma.Multiaddr{memAddr},
}
err = h.Connect(ctx, info)
if err != nil {
t.Fatal(err)
}
donech := make(chan struct{})
h.SetStreamHandler(protocol.TestingID, func(s network.Stream) {
n, err := s.Write([]byte{0x01, 0x02, 0x03})
if err != nil {
t.Fatal(err)
}
if n != 3 {
t.Fatalf("expected 3 bytes, wrote %d", n)
}
donech <- struct{}{}
})
streamContext, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
s, err := h.NewStream(streamContext, h.ID(), protocol.TestingID)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 5)
n, err := s.Read(buf)
if err != nil {
t.Fatal(err)
}
if n != 3 {
t.Fatalf("expected 3 bytes, wrote %d", n)
}
for i := byte(1); i < 4; i++ {
if buf[int(i)-1] != i {
t.Fatalf("bytes didn't match: %x vs %x", buf[int(i)-1], i)
}
}
s.Close()
h.Close()
}

func TestPipeTransportAcrossHosts(t *testing.T) {
ctx := context.Background()
memAddr1, err := ma.NewMultiaddr("/memory/1")
if err != nil {
t.Fatal(err)
}
h1, err := New(ctx, NoSecurity, EnableSelfDial, ListenAddrs(memAddr1))
if err != nil {
t.Fatal(err)
}
memAddr2, err := ma.NewMultiaddr("/memory/2")
if err != nil {
t.Fatal(err)
}
h2, err := New(ctx, NoSecurity, EnableSelfDial, ListenAddrs(memAddr2))
if err != nil {
t.Fatal(err)
}
info2 := peer.AddrInfo{
ID: h2.ID(),
Addrs: []ma.Multiaddr{memAddr2},
}
err = h1.Connect(ctx, info2)
if err != nil {
t.Fatal(err)
}
donech := make(chan struct{})
h2.SetStreamHandler(protocol.TestingID, func(s network.Stream) {
n, err := s.Write([]byte{0x01, 0x02, 0x03})
if err != nil {
t.Fatal(err)
}
if n != 3 {
t.Fatalf("expected 3 bytes, wrote %d", n)
}
donech <- struct{}{}
})
streamContext, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
s, err := h1.NewStream(streamContext, h2.ID(), protocol.TestingID)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 5)
n, err := s.Read(buf)
if err != nil {
t.Fatal(err)
}
if n != 3 {
t.Fatalf("expected 3 bytes, wrote %d", n)
}
for i := byte(1); i < 4; i++ {
if buf[int(i)-1] != i {
t.Fatalf("bytes didn't match: %x vs %x", buf[int(i)-1], i)
}
}
s.Close()
}

func TestDefaultListenAddrs(t *testing.T) {
ctx := context.Background()

Expand Down
7 changes: 7 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,13 @@ func Routing(rt config.RoutingC) Option {
}
}

// EnableSelfDial will configure libp2p to support self dialing; disabled by
// default.
var EnableSelfDial Option = func(cfg *Config) error {
cfg.SelfDial = true
return nil
}

// NoListenAddrs will configure libp2p to not listen by default.
//
// This will both clear any configured listen addrs and prevent libp2p from
Expand Down
114 changes: 114 additions & 0 deletions p2p/transport/pipe/pipeconn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package pipetransport

import (
"fmt"
"io"
"net"
"sync"

ic "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
streammux "github.com/libp2p/go-stream-muxer"
ma "github.com/multiformats/go-multiaddr"
)

type PipeConn struct {
streams chan *PipeStream

mclosed *sync.RWMutex
closed bool

addr ma.Multiaddr
id peer.ID
pubKey ic.PubKey
transport *PipeTransport
}

func NewPipeConn(id peer.ID, addr ma.Multiaddr, pubKey ic.PubKey, transport *PipeTransport) *PipeConn {
return &PipeConn{
streams: make(chan *PipeStream),
addr: addr,
id: id,
pubKey: pubKey,
transport: transport,
mclosed: new(sync.RWMutex),
closed: false,
}
}

func (c *PipeConn) Close() error {
c.mclosed.Lock()
defer c.mclosed.Unlock()

if c.closed {
return fmt.Errorf("tried to close already closed connection")
}

c.closed = true
close(c.streams)
return nil
}

func (c *PipeConn) IsClosed() bool {
c.mclosed.RLock()
defer c.mclosed.RUnlock()

return c.closed
}

func (c *PipeConn) AcceptStream() (streammux.Stream, error) {
stream, ok := <-c.streams
if !ok {
return nil, io.EOF
}
return stream, nil
}

func (c *PipeConn) OpenStream() (streammux.Stream, error) {
c.mclosed.RLock()
defer c.mclosed.RUnlock()

if c.closed {
return nil, fmt.Errorf("opening stream on closed connection")
}

connA, connB := net.Pipe()
connC, connD := net.Pipe()
errchA := make(chan error, 1)
errchB := make(chan error, 1)
streamA := NewPipeStream(connA, connC, errchB, errchA)
streamB := NewPipeStream(connD, connB, errchA, errchB)
c.streams <- streamB
return streamA, nil
}

func (c *PipeConn) LocalMultiaddr() ma.Multiaddr {
return c.addr
}

func (c *PipeConn) RemoteMultiaddr() ma.Multiaddr {
return c.addr
}

func (c *PipeConn) LocalPeer() peer.ID {
return c.transport.id
}

func (c *PipeConn) LocalPrivateKey() ic.PrivKey {
return c.transport.privKey
}

func (c *PipeConn) RemotePeer() peer.ID {
return c.id
}

func (c *PipeConn) RemotePublicKey() ic.PubKey {
return c.pubKey
}

func (c *PipeConn) Transport() tpt.Transport {
return c.transport
}

var _ tpt.Conn = (*PipeConn)(nil)
Loading