diff --git a/config/config.go b/config/config.go index 3bb51bf1f7..72b10734a2 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ import ( bhost "github.com/libp2p/go-libp2p/p2p/host/basic" relay "github.com/libp2p/go-libp2p/p2p/host/relay" routed "github.com/libp2p/go-libp2p/p2p/host/routed" + pipetransport "github.com/libp2p/go-libp2p/p2p/transport/pipe" circuit "github.com/libp2p/go-libp2p-circuit" discovery "github.com/libp2p/go-libp2p-discovery" @@ -66,6 +67,7 @@ type Config struct { Reporter metrics.Reporter DisablePing bool + SelfDial bool Routing RoutingC @@ -110,7 +112,7 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { } // TODO: Make the swarm implementation configurable. - swrm := swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter) + swrm := swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.SelfDial) if cfg.Filters != nil { swrm.Filters = cfg.Filters } @@ -162,6 +164,10 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { h.Close() return nil, err } + if cfg.SelfDial { + tpt := pipetransport.New(pid, cfg.PeerKey.GetPublic(), cfg.PeerKey) + tpts = append(tpts, tpt) + } for _, t := range tpts { err = swrm.AddTransport(t) if err != nil { diff --git a/go.mod b/go.mod index 91fe7d2178..53753e50b8 100644 --- a/go.mod +++ b/go.mod @@ -13,18 +13,22 @@ require ( github.com/libp2p/go-libp2p-blankhost v0.1.1 github.com/libp2p/go-libp2p-circuit v0.1.0 github.com/libp2p/go-libp2p-core v0.0.1 + github.com/libp2p/go-libp2p-crypto v0.1.0 github.com/libp2p/go-libp2p-discovery v0.1.0 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-mplex v0.2.1 github.com/libp2p/go-libp2p-nat v0.0.4 github.com/libp2p/go-libp2p-netutil v0.1.0 + github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-peerstore v0.1.0 github.com/libp2p/go-libp2p-secio v0.1.0 github.com/libp2p/go-libp2p-swarm v0.1.0 github.com/libp2p/go-libp2p-testing v0.0.3 + github.com/libp2p/go-libp2p-transport v0.1.0 github.com/libp2p/go-libp2p-transport-upgrader v0.1.1 github.com/libp2p/go-libp2p-yamux v0.2.0 github.com/libp2p/go-maddr-filter v0.0.4 + github.com/libp2p/go-stream-muxer v0.1.0 github.com/libp2p/go-stream-muxer-multistream v0.2.0 github.com/libp2p/go-tcp-transport v0.1.0 github.com/libp2p/go-ws-transport v0.1.0 @@ -35,3 +39,7 @@ require ( github.com/multiformats/go-multistream v0.1.0 github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30 ) + +replace github.com/libp2p/go-libp2p-swarm => ../go-libp2p-swarm + +replace github.com/multiformats/go-multiaddr => ../../workspace-multiformats/go-multiaddr diff --git a/go.sum b/go.sum index 0f6dbadbb6..91105af602 100644 --- a/go.sum +++ b/go.sum @@ -114,11 +114,12 @@ github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpo github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY= github.com/libp2p/go-libp2p-secio v0.1.0 h1:NNP5KLxuP97sE5Bu3iuwOWyT/dKEGMN5zSLMWdB7GTQ= github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8= -github.com/libp2p/go-libp2p-swarm v0.1.0 h1:HrFk2p0awrGEgch9JXK/qp/hfjqQfgNxpLWnCiWPg5s= -github.com/libp2p/go-libp2p-swarm v0.1.0/go.mod h1:wQVsCdjsuZoc730CgOvh5ox6K8evllckjebkdiY5ta4= +github.com/libp2p/go-libp2p-testing v0.0.1/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.3 h1:bdij4bKaaND7tCsaXVjRfYkMpvoOeKj9AVQGJllA6jM= github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= +github.com/libp2p/go-libp2p-transport v0.1.0 h1:q68SOTvX+71mk+n5eE3+FnUEPY5UL1CSFImH0bq0Vg8= +github.com/libp2p/go-libp2p-transport v0.1.0/go.mod h1:iL3c2tV3OVldqSwJrds8pmIWf4t/TwiF+eI/mhw/jjQ= github.com/libp2p/go-libp2p-transport-upgrader v0.1.1 h1:PZMS9lhjK9VytzMCW3tWHAXtKXmlURSc3ZdvwEcKCzw= github.com/libp2p/go-libp2p-transport-upgrader v0.1.1/go.mod h1:IEtA6or8JUbsV07qPW4r01GnTenLW4oi3lOPbUMGJJA= github.com/libp2p/go-libp2p-yamux v0.2.0 h1:TSPZ5cMMz/wdoYsye/wU1TE4G3LDGMoeEN0xgnCKU/I= @@ -137,6 +138,8 @@ github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQza github.com/libp2p/go-reuseport-transport v0.0.2 h1:WglMwyXyBu61CMkjCCtnmqNqnjib0GIEjMiHTwR/KN4= github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA20obEtsW9BLkuOUyQAOCbs= github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14= +github.com/libp2p/go-stream-muxer v0.1.0 h1:3ToDXUzx8pDC6RfuOzGsUYP5roMDthbUKRdMRRhqAqY= +github.com/libp2p/go-stream-muxer v0.1.0/go.mod h1:8JAVsjeRBCWwPoZeH0W1imLOcriqXJyFvB0mR4A04sQ= github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROmAFwUHpeRidG+q7LTQOg= github.com/libp2p/go-stream-muxer-multistream v0.2.0/go.mod h1:j9eyPol/LLRqT+GPLSxvimPhNph4sfYfMoDPd7HkzIc= github.com/libp2p/go-tcp-transport v0.1.0 h1:IGhowvEqyMFknOar4FWCKSWE0zL36UFKQtiRQD60/8o= @@ -165,11 +168,6 @@ github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= -github.com/multiformats/go-multiaddr v0.0.1 h1:/QUV3VBMDI6pi6xfiw7lr6xhDWWvQKn9udPn68kLSdY= -github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= -github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= -github.com/multiformats/go-multiaddr v0.0.4 h1:WgMSI84/eRLdbptXMkMWDXPjPq7SPLIgGUVm2eroyU4= -github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= github.com/multiformats/go-multiaddr-dns v0.0.2 h1:/Bbsgsy3R6e3jf2qBahzNHzww6usYaZ0NhNH3sqdFS8= github.com/multiformats/go-multiaddr-dns v0.0.2/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= diff --git a/libp2p_test.go b/libp2p_test.go index d1099c647d..1d6d420704 100644 --- a/libp2p_test.go +++ b/libp2p_test.go @@ -6,11 +6,15 @@ import ( "regexp" "strings" "testing" + "time" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-tcp-transport" + ma "github.com/multiformats/go-multiaddr" ) func TestNewHost(t *testing.T) { @@ -77,6 +81,117 @@ func TestInsecure(t *testing.T) { h.Close() } +func TestPipeTransport(t *testing.T) { + ctx := context.Background() + memAddr, err := ma.NewMultiaddr("/memory/1") + if err != nil { + t.Fatal(err) + } + h, err := New(ctx, NoSecurity, EnableSelfDial, ListenAddrs(memAddr)) + if err != nil { + t.Fatal(err) + } + info := peer.AddrInfo{ + ID: h.ID(), + Addrs: []ma.Multiaddr{memAddr}, + } + err = h.Connect(ctx, info) + if err != nil { + t.Fatal(err) + } + donech := make(chan struct{}) + h.SetStreamHandler(protocol.TestingID, func(s network.Stream) { + n, err := s.Write([]byte{0x01, 0x02, 0x03}) + if err != nil { + t.Fatal(err) + } + if n != 3 { + t.Fatalf("expected 3 bytes, wrote %d", n) + } + donech <- struct{}{} + }) + streamContext, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + s, err := h.NewStream(streamContext, h.ID(), protocol.TestingID) + if err != nil { + t.Fatal(err) + } + buf := make([]byte, 5) + n, err := s.Read(buf) + if err != nil { + t.Fatal(err) + } + if n != 3 { + t.Fatalf("expected 3 bytes, wrote %d", n) + } + for i := byte(1); i < 4; i++ { + if buf[int(i)-1] != i { + t.Fatalf("bytes didn't match: %x vs %x", buf[int(i)-1], i) + } + } + s.Close() + h.Close() +} + +func TestPipeTransportAcrossHosts(t *testing.T) { + ctx := context.Background() + memAddr1, err := ma.NewMultiaddr("/memory/1") + if err != nil { + t.Fatal(err) + } + h1, err := New(ctx, NoSecurity, EnableSelfDial, ListenAddrs(memAddr1)) + if err != nil { + t.Fatal(err) + } + memAddr2, err := ma.NewMultiaddr("/memory/2") + if err != nil { + t.Fatal(err) + } + h2, err := New(ctx, NoSecurity, EnableSelfDial, ListenAddrs(memAddr2)) + if err != nil { + t.Fatal(err) + } + info2 := peer.AddrInfo{ + ID: h2.ID(), + Addrs: []ma.Multiaddr{memAddr2}, + } + err = h1.Connect(ctx, info2) + if err != nil { + t.Fatal(err) + } + donech := make(chan struct{}) + h2.SetStreamHandler(protocol.TestingID, func(s network.Stream) { + n, err := s.Write([]byte{0x01, 0x02, 0x03}) + if err != nil { + t.Fatal(err) + } + if n != 3 { + t.Fatalf("expected 3 bytes, wrote %d", n) + } + donech <- struct{}{} + }) + streamContext, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + s, err := h1.NewStream(streamContext, h2.ID(), protocol.TestingID) + if err != nil { + t.Fatal(err) + } + buf := make([]byte, 5) + n, err := s.Read(buf) + if err != nil { + t.Fatal(err) + } + if n != 3 { + t.Fatalf("expected 3 bytes, wrote %d", n) + } + for i := byte(1); i < 4; i++ { + if buf[int(i)-1] != i { + t.Fatalf("bytes didn't match: %x vs %x", buf[int(i)-1], i) + } + } + s.Close() +} + func TestDefaultListenAddrs(t *testing.T) { ctx := context.Background() diff --git a/options.go b/options.go index 6d310f1aa3..39830cca83 100644 --- a/options.go +++ b/options.go @@ -296,6 +296,13 @@ func Routing(rt config.RoutingC) Option { } } +// EnableSelfDial will configure libp2p to support self dialing; disabled by +// default. +var EnableSelfDial Option = func(cfg *Config) error { + cfg.SelfDial = true + return nil +} + // NoListenAddrs will configure libp2p to not listen by default. // // This will both clear any configured listen addrs and prevent libp2p from diff --git a/p2p/transport/pipe/pipeconn.go b/p2p/transport/pipe/pipeconn.go new file mode 100644 index 0000000000..386635f5eb --- /dev/null +++ b/p2p/transport/pipe/pipeconn.go @@ -0,0 +1,114 @@ +package pipetransport + +import ( + "fmt" + "io" + "net" + "sync" + + ic "github.com/libp2p/go-libp2p-crypto" + peer "github.com/libp2p/go-libp2p-peer" + tpt "github.com/libp2p/go-libp2p-transport" + streammux "github.com/libp2p/go-stream-muxer" + ma "github.com/multiformats/go-multiaddr" +) + +type PipeConn struct { + streams chan *PipeStream + + mclosed *sync.RWMutex + closed bool + + addr ma.Multiaddr + id peer.ID + pubKey ic.PubKey + transport *PipeTransport +} + +func NewPipeConn(id peer.ID, addr ma.Multiaddr, pubKey ic.PubKey, transport *PipeTransport) *PipeConn { + return &PipeConn{ + streams: make(chan *PipeStream), + addr: addr, + id: id, + pubKey: pubKey, + transport: transport, + mclosed: new(sync.RWMutex), + closed: false, + } +} + +func (c *PipeConn) Close() error { + c.mclosed.Lock() + defer c.mclosed.Unlock() + + if c.closed { + return fmt.Errorf("tried to close already closed connection") + } + + c.closed = true + close(c.streams) + return nil +} + +func (c *PipeConn) IsClosed() bool { + c.mclosed.RLock() + defer c.mclosed.RUnlock() + + return c.closed +} + +func (c *PipeConn) AcceptStream() (streammux.Stream, error) { + stream, ok := <-c.streams + if !ok { + return nil, io.EOF + } + return stream, nil +} + +func (c *PipeConn) OpenStream() (streammux.Stream, error) { + c.mclosed.RLock() + defer c.mclosed.RUnlock() + + if c.closed { + return nil, fmt.Errorf("opening stream on closed connection") + } + + connA, connB := net.Pipe() + connC, connD := net.Pipe() + errchA := make(chan error, 1) + errchB := make(chan error, 1) + streamA := NewPipeStream(connA, connC, errchB, errchA) + streamB := NewPipeStream(connD, connB, errchA, errchB) + c.streams <- streamB + return streamA, nil +} + +func (c *PipeConn) LocalMultiaddr() ma.Multiaddr { + return c.addr +} + +func (c *PipeConn) RemoteMultiaddr() ma.Multiaddr { + return c.addr +} + +func (c *PipeConn) LocalPeer() peer.ID { + return c.transport.id +} + +func (c *PipeConn) LocalPrivateKey() ic.PrivKey { + return c.transport.privKey +} + +func (c *PipeConn) RemotePeer() peer.ID { + return c.id +} + +func (c *PipeConn) RemotePublicKey() ic.PubKey { + return c.pubKey +} + +func (c *PipeConn) Transport() tpt.Transport { + return c.transport +} + +var _ tpt.Conn = (*PipeConn)(nil) diff --git a/p2p/transport/pipe/pipelistener.go b/p2p/transport/pipe/pipelistener.go new file mode 100644 index 0000000000..2ad7f1cbee --- /dev/null +++ b/p2p/transport/pipe/pipelistener.go @@ -0,0 +1,46 @@ +package pipetransport + +import ( + "fmt" + "net" + + tpt "github.com/libp2p/go-libp2p-transport" + ma "github.com/multiformats/go-multiaddr" +) + +type PipeListener struct { + listenaddr ma.Multiaddr + listench chan *PipeConn + transport *PipeTransport +} + +func NewPipeListener(addr ma.Multiaddr, ch chan *PipeConn, t *PipeTransport) *PipeListener { + return &PipeListener{ + listenaddr: addr, + listench: ch, + transport: t, + } +} + +func (l *PipeListener) Accept() (tpt.Conn, error) { + conn, ok := <-l.listench + if !ok { + return nil, fmt.Errorf("memorytransport closed") + } + return conn, nil +} + +func (l *PipeListener) Close() error { + l.transport.closeListener(l.listenaddr.String()) + return nil +} + +func (l *PipeListener) Addr() net.Addr { + return nil +} + +func (l *PipeListener) Multiaddr() ma.Multiaddr { + return l.listenaddr +} + +var _ tpt.Listener = (*PipeListener)(nil) diff --git a/p2p/transport/pipe/pipestream.go b/p2p/transport/pipe/pipestream.go new file mode 100644 index 0000000000..4c6c08a372 --- /dev/null +++ b/p2p/transport/pipe/pipestream.go @@ -0,0 +1,95 @@ +package pipetransport + +import ( + "fmt" + "net" + "time" + + mux "github.com/libp2p/go-libp2p-core/mux" + streammux "github.com/libp2p/go-stream-muxer" +) + +var ErrHalfClosed = fmt.Errorf("tried to write to stream that was half closed") +var ErrAlreadyHalfClosed = fmt.Errorf("tried to half close stream that was already half closed") + +type PipeStream struct { + inbound net.Conn + outbound net.Conn + errchs struct { + in <-chan error + out chan<- error + } +} + +func NewPipeStream(inbound net.Conn, outbound net.Conn, errinch <-chan error, erroutch chan<- error) *PipeStream { + return &PipeStream{ + inbound: inbound, + outbound: outbound, + errchs: struct { + in <-chan error + out chan<- error + }{ + in: errinch, + out: erroutch, + }, + } +} + +func (s *PipeStream) Close() error { + return s.outbound.Close() +} + +func (s *PipeStream) Reset() error { + s.errchs.out <- mux.ErrReset + err1 := s.inbound.Close() + err2 := s.outbound.Close() + if err1 != nil { + return err1 + } + if err2 != nil { + return err2 + } + return nil +} + +func (s *PipeStream) Read(b []byte) (int, error) { + n, err := s.inbound.Read(b) + + select { + case reseterr := <-s.errchs.in: + return n, reseterr + default: + return n, err + } +} + +func (s *PipeStream) Write(b []byte) (int, error) { + n, err := s.outbound.Write(b) + + select { + case reseterr := <-s.errchs.in: + return n, reseterr + default: + fmt.Println("no probs here") + return n, err + } +} + +func (s *PipeStream) SetDeadline(t time.Time) error { + err := s.inbound.SetDeadline(t) + if err != nil { + return err + } + err = s.outbound.SetDeadline(t) + return err +} + +func (s *PipeStream) SetReadDeadline(t time.Time) error { + return s.inbound.SetReadDeadline(t) +} + +func (s *PipeStream) SetWriteDeadline(t time.Time) error { + return s.outbound.SetWriteDeadline(t) +} + +var _ streammux.Stream = (*PipeStream)(nil) diff --git a/p2p/transport/pipe/pipetransport.go b/p2p/transport/pipe/pipetransport.go new file mode 100644 index 0000000000..b8b29c2630 --- /dev/null +++ b/p2p/transport/pipe/pipetransport.go @@ -0,0 +1,98 @@ +package pipetransport + +import ( + "context" + "fmt" + + "sync" + + ic "github.com/libp2p/go-libp2p-crypto" + peer "github.com/libp2p/go-libp2p-peer" + tpt "github.com/libp2p/go-libp2p-transport" + ma "github.com/multiformats/go-multiaddr" +) + +type listenerChans struct { + mlistenchans *sync.RWMutex + listenchans map[string]chan *PipeConn +} + +var listeners = &listenerChans{ + mlistenchans: new(sync.RWMutex), + listenchans: make(map[string]chan *PipeConn), +} + +type PipeTransport struct { + id peer.ID + pubKey ic.PubKey + privKey ic.PrivKey +} + +var _ tpt.Transport = (*PipeTransport)(nil) + +func New(id peer.ID, pubKey ic.PubKey, privKey ic.PrivKey) *PipeTransport { + return &PipeTransport{ + id: id, + pubKey: pubKey, + privKey: privKey, + } +} + +func (t *PipeTransport) closeListener(addr string) { + listeners.mlistenchans.Lock() + defer listeners.mlistenchans.Unlock() + + ch, ok := listeners.listenchans[addr] + if !ok { + return + } + close(ch) + delete(listeners.listenchans, addr) +} + +func (t *PipeTransport) CanDial(addr ma.Multiaddr) bool { + protocols := addr.Protocols() + return len(protocols) == 1 && protocols[0].Code == ma.P_MEMORY +} + +func (t *PipeTransport) Protocols() []int { + return []int{ + ma.P_MEMORY, + } +} + +func (t *PipeTransport) Proxy() bool { + return false +} + +func (t *PipeTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) { + listeners.mlistenchans.RLock() + defer listeners.mlistenchans.RUnlock() + raddrStr := raddr.String() + + ch, ok := listeners.listenchans[raddrStr] + if !ok { + return nil, fmt.Errorf("no memorylistener for %s", raddrStr) + } + + conn := NewPipeConn(p, raddr, t.pubKey, t) + ch <- conn + return conn, nil +} + +func (t *PipeTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) { + listeners.mlistenchans.Lock() + defer listeners.mlistenchans.Unlock() + + laddrStr := laddr.String() + if _, ok := listeners.listenchans[laddrStr]; ok { + return nil, fmt.Errorf("already listening on %s", laddrStr) + } + + ch := make(chan *PipeConn) + listeners.listenchans[laddrStr] = ch + + listener := NewPipeListener(laddr, ch, t) + + return listener, nil +} diff --git a/p2p/transport/pipe/pipetransport_test.go b/p2p/transport/pipe/pipetransport_test.go new file mode 100644 index 0000000000..70167bab17 --- /dev/null +++ b/p2p/transport/pipe/pipetransport_test.go @@ -0,0 +1,93 @@ +package pipetransport + +import ( + "context" + "fmt" + "testing" + "time" + + peer "github.com/libp2p/go-libp2p-peer" + peertest "github.com/libp2p/go-libp2p-peer/test" + utils "github.com/libp2p/go-libp2p-transport/test" + ma "github.com/multiformats/go-multiaddr" +) + +func TestPipeTransport(t *testing.T) { + privKey, pubKey, err := peertest.RandTestKeyPair(512) + if err != nil { + t.Fatal() + } + id, err := peer.IDFromPrivateKey(privKey) + if err != nil { + t.Fatal(err) + } + transport := New(id, pubKey, privKey) + listenAddrStr := fmt.Sprintf("/memory/1") + listenAddr, _ := ma.NewMultiaddr(listenAddrStr) + listener, err := transport.Listen(listenAddr) + if err != nil { + t.Fatal(err) + } + go func() { + conn, err := listener.Accept() + if err != nil { + t.Fatal(err) + } + stream, err := conn.OpenStream() + if err != nil { + t.Fatal(err) + } + n, err := stream.Write([]byte("sup")) + if err != nil { + t.Fatal(err) + } + if n != 3 { + t.Fatalf("expected to write 3 bytes, wrote %d", n) + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + conn, err := transport.Dial(ctx, listenAddr, id) + if err != nil { + t.Fatal(err) + } + stream, err := conn.AcceptStream() + if err != nil { + t.Fatal(err) + } + buf := make([]byte, 10) + n, err := stream.Read(buf) + if n != 3 { + t.Fatal("expected 3 bytes, got", n) + } + if err != nil { + t.Fatal(err) + } + bufStr := string(buf[0:n]) + if bufStr != "sup" { + t.Fatal("unexpected message: ", bufStr) + } +} + +// Note: the stress tests don't work because they rely on multiple listeners +// being able to bind the same address, usually by requesting TCP port 0 from +// the kernel. This doesn't apply for the pipe transport. The pipe transport +// supports one per peer ID. Furthermore, the cancel test doesn't apply, since +// there is no latency when opening pipe transports. +func TestPipeTransportFull(t *testing.T) { + privKey, pubKey, err := peertest.RandTestKeyPair(512) + if err != nil { + t.Fatal() + } + id, err := peer.IDFromPrivateKey(privKey) + if err != nil { + t.Fatal(err) + } + transport := New(id, pubKey, privKey) + listenAddr, _ := ma.NewMultiaddr("/memory/1") + utils.SubtestBasic(t, transport, transport, listenAddr, id) + utils.SubtestProtocols(t, transport, transport, listenAddr, id) + utils.SubtestPingPong(t, transport, transport, listenAddr, id) + utils.SubtestStreamReset(t, transport, transport, listenAddr, id) +}