From 3023356beb10273993a8a6d2f9c069194f9b35ca Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 11 Jul 2015 18:51:47 -0700 Subject: [PATCH 01/24] extracted from go-peerstream --- p2p/muxer/mplex/multiplex.go | 44 +++++++++++++++++++++++++++++++ p2p/muxer/mplex/multiplex_test.go | 11 ++++++++ 2 files changed, 55 insertions(+) create mode 100644 p2p/muxer/mplex/multiplex.go create mode 100644 p2p/muxer/mplex/multiplex_test.go diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go new file mode 100644 index 0000000000..62868a8410 --- /dev/null +++ b/p2p/muxer/mplex/multiplex.go @@ -0,0 +1,44 @@ +package peerstream_multiplex + +import ( + "net" + + smux "github.com/jbenet/go-stream-mux" + mp "github.com/jbenet/go-stream-mux/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. +) + +type conn struct { + *mp.Multiplex +} + +func (c *conn) Close() error { + return c.Multiplex.Close() +} + +func (c *conn) IsClosed() bool { + return c.Multiplex.IsClosed() +} + +// OpenStream creates a new stream. +func (c *conn) OpenStream() (smux.Stream, error) { + return c.Multiplex.NewStream(), nil +} + +// Serve starts listening for incoming requests and handles them +// using given StreamHandler +func (c *conn) Serve(handler smux.StreamHandler) { + c.Multiplex.Serve(func(s *mp.Stream) { + handler(s) + }) +} + +// Transport is a go-peerstream transport that constructs +// multiplex-backed connections. +type Transport struct{} + +// DefaultTransport has default settings for multiplex +var DefaultTransport = &Transport{} + +func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { + return &conn{mp.NewMultiplex(nc, isServer)}, nil +} diff --git a/p2p/muxer/mplex/multiplex_test.go b/p2p/muxer/mplex/multiplex_test.go new file mode 100644 index 0000000000..1b8ffd76a3 --- /dev/null +++ b/p2p/muxer/mplex/multiplex_test.go @@ -0,0 +1,11 @@ +package peerstream_multiplex + +import ( + "testing" + + test "github.com/jbenet/go-stream-mux/test" +) + +func TestMultiplexTransport(t *testing.T) { + test.SubtestAll(t, DefaultTransport) +} From 5b2626d3699a2977a3a9f4a327f0ca0941758044 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 11 Jul 2015 19:17:39 -0700 Subject: [PATCH 02/24] fixed urls --- p2p/muxer/mplex/multiplex.go | 2 +- p2p/muxer/mplex/multiplex_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 62868a8410..6d2387ca5d 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -4,7 +4,7 @@ import ( "net" smux "github.com/jbenet/go-stream-mux" - mp "github.com/jbenet/go-stream-mux/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. + mp "github.com/jbenet/go-stream-muxer/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. ) type conn struct { diff --git a/p2p/muxer/mplex/multiplex_test.go b/p2p/muxer/mplex/multiplex_test.go index 1b8ffd76a3..e35b3b038d 100644 --- a/p2p/muxer/mplex/multiplex_test.go +++ b/p2p/muxer/mplex/multiplex_test.go @@ -3,7 +3,7 @@ package peerstream_multiplex import ( "testing" - test "github.com/jbenet/go-stream-mux/test" + test "github.com/jbenet/go-stream-muxer/test" ) func TestMultiplexTransport(t *testing.T) { From 73c3d70a0850ca2a82e2606edab7877091954855 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 11 Jul 2015 19:39:04 -0700 Subject: [PATCH 03/24] links fix --- p2p/muxer/mplex/multiplex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 6d2387ca5d..16e10006ef 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -3,7 +3,7 @@ package peerstream_multiplex import ( "net" - smux "github.com/jbenet/go-stream-mux" + smux "github.com/jbenet/go-stream-muxer" mp "github.com/jbenet/go-stream-muxer/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. ) From aab33b03745aab68b3fdf155e1a92400c1ea72f2 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 11 Jul 2015 20:55:24 -0700 Subject: [PATCH 04/24] implement AcceptStream --- p2p/muxer/mplex/multiplex.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 16e10006ef..0c626480e9 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -1,12 +1,15 @@ package peerstream_multiplex import ( + "errors" "net" smux "github.com/jbenet/go-stream-muxer" mp "github.com/jbenet/go-stream-muxer/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. ) +var ErrUseServe = errors.New("not implemented, use Serve") + type conn struct { *mp.Multiplex } @@ -24,6 +27,11 @@ func (c *conn) OpenStream() (smux.Stream, error) { return c.Multiplex.NewStream(), nil } +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (smux.Stream, error) { + return nil, ErrUseServe +} + // Serve starts listening for incoming requests and handles them // using given StreamHandler func (c *conn) Serve(handler smux.StreamHandler) { From 357317776cc1f37a7cd8893d14224b1e5cc82303 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 14 Jul 2015 09:18:42 -0700 Subject: [PATCH 05/24] update multistream and multiplex --- p2p/muxer/mplex/multiplex.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 0c626480e9..d07d53e057 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -29,15 +29,19 @@ func (c *conn) OpenStream() (smux.Stream, error) { // AcceptStream accepts a stream opened by the other side. func (c *conn) AcceptStream() (smux.Stream, error) { - return nil, ErrUseServe + return c.Multiplex.Accept() } // Serve starts listening for incoming requests and handles them // using given StreamHandler func (c *conn) Serve(handler smux.StreamHandler) { - c.Multiplex.Serve(func(s *mp.Stream) { - handler(s) - }) + for { + s, err := c.AcceptStream() + if err != nil { + return + } + go handler(s) + } } // Transport is a go-peerstream transport that constructs From 9b37fe1717575d35c95858c93b0827e93c5fda11 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 2 May 2016 15:24:02 -0700 Subject: [PATCH 06/24] gxify and update deps --- p2p/muxer/mplex/multiplex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index d07d53e057..7d7f53aa3f 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -5,7 +5,7 @@ import ( "net" smux "github.com/jbenet/go-stream-muxer" - mp "github.com/jbenet/go-stream-muxer/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. + mp "github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. ) var ErrUseServe = errors.New("not implemented, use Serve") From 9a45ffbf09421da6d704507a03a89eee11eb9e6b Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 4 May 2016 15:51:32 -0700 Subject: [PATCH 07/24] rewrite paths to gx versions --- p2p/muxer/mplex/multiplex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 7d7f53aa3f..bb17f61470 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -5,7 +5,7 @@ import ( "net" smux "github.com/jbenet/go-stream-muxer" - mp "github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. + mp "gx/ipfs/QmSLbJgmwYvjQuMpfW6kUt5KsJeEkjLwXTkRM5DkqR14if/go-multiplex" // Conn is a connection to a remote peer. ) var ErrUseServe = errors.New("not implemented, use Serve") From b3902adadb872a12e3bbf47cbc5e364330882a93 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 13 May 2016 09:16:00 -0700 Subject: [PATCH 08/24] gx publish version 1.0.0 --- p2p/muxer/mplex/multiplex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index bb17f61470..7d7f53aa3f 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -5,7 +5,7 @@ import ( "net" smux "github.com/jbenet/go-stream-muxer" - mp "gx/ipfs/QmSLbJgmwYvjQuMpfW6kUt5KsJeEkjLwXTkRM5DkqR14if/go-multiplex" // Conn is a connection to a remote peer. + mp "github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. ) var ErrUseServe = errors.New("not implemented, use Serve") From ac8a3088a4311fd7cd78f089e69bd062f6f9435a Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 22 Nov 2016 15:19:54 -0800 Subject: [PATCH 09/24] update multiplex to 0.1.0 --- p2p/muxer/mplex/multiplex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 7d7f53aa3f..6db90ca62f 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -24,7 +24,7 @@ func (c *conn) IsClosed() bool { // OpenStream creates a new stream. func (c *conn) OpenStream() (smux.Stream, error) { - return c.Multiplex.NewStream(), nil + return c.Multiplex.NewStream() } // AcceptStream accepts a stream opened by the other side. From 43db8f270ac548bdd0b39b3739c0fa0fb6e082bf Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 18 Aug 2017 00:54:48 -0700 Subject: [PATCH 10/24] add support for the new Reset method. --- p2p/muxer/mplex/multiplex.go | 2 +- p2p/muxer/mplex/multiplex_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 6db90ca62f..77227ccf25 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -4,7 +4,7 @@ import ( "errors" "net" - smux "github.com/jbenet/go-stream-muxer" + smux "github.com/libp2p/go-stream-muxer" mp "github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. ) diff --git a/p2p/muxer/mplex/multiplex_test.go b/p2p/muxer/mplex/multiplex_test.go index e35b3b038d..2b392e231d 100644 --- a/p2p/muxer/mplex/multiplex_test.go +++ b/p2p/muxer/mplex/multiplex_test.go @@ -3,7 +3,7 @@ package peerstream_multiplex import ( "testing" - test "github.com/jbenet/go-stream-muxer/test" + test "github.com/libp2p/go-stream-muxer/test" ) func TestMultiplexTransport(t *testing.T) { From a920757f7e8d15d4bb999eb70506e49ba9c84ecb Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 19 Aug 2017 14:25:42 +0700 Subject: [PATCH 11/24] remove conn.Serve --- p2p/muxer/mplex/multiplex.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 77227ccf25..0eebb7262f 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -1,15 +1,12 @@ package peerstream_multiplex import ( - "errors" "net" smux "github.com/libp2p/go-stream-muxer" mp "github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. ) -var ErrUseServe = errors.New("not implemented, use Serve") - type conn struct { *mp.Multiplex } @@ -32,18 +29,6 @@ func (c *conn) AcceptStream() (smux.Stream, error) { return c.Multiplex.Accept() } -// Serve starts listening for incoming requests and handles them -// using given StreamHandler -func (c *conn) Serve(handler smux.StreamHandler) { - for { - s, err := c.AcceptStream() - if err != nil { - return - } - go handler(s) - } -} - // Transport is a go-peerstream transport that constructs // multiplex-backed connections. type Transport struct{} From a58964041052f25879a241b117058170a4b062d9 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 Jun 2018 17:21:02 -0700 Subject: [PATCH 12/24] gx publish 3.0.11 --- p2p/muxer/mplex/multiplex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 0eebb7262f..4a217c3048 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -3,7 +3,7 @@ package peerstream_multiplex import ( "net" - smux "github.com/libp2p/go-stream-muxer" + smux "github.com/libp2p/go-stream-muxer" // Conn is a connection to a remote peer. mp "github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. ) From fad7d9b963f4ee2f34956ccb96877cea23fe8c72 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 25 Sep 2018 16:11:49 -0700 Subject: [PATCH 13/24] switch to the correct mplex package --- p2p/muxer/mplex/multiplex.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 4a217c3048..1cef4ef3ff 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -3,8 +3,8 @@ package peerstream_multiplex import ( "net" - smux "github.com/libp2p/go-stream-muxer" // Conn is a connection to a remote peer. - mp "github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. + mp "github.com/libp2p/go-mplex" // Conn is a connection to a remote peer. + smux "github.com/libp2p/go-stream-muxer" // Conn is a connection to a remote peer. ) type conn struct { From aa541ddf45c27e04e2f8d7dae26d08ce2e0ca637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Sat, 25 May 2019 14:49:18 +0100 Subject: [PATCH 14/24] migrate to consolidated types; add travis config (#1) --- p2p/muxer/mplex/multiplex.go | 11 ++++++----- p2p/muxer/mplex/multiplex_test.go | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go index 1cef4ef3ff..bf53c9e7fb 100644 --- a/p2p/muxer/mplex/multiplex.go +++ b/p2p/muxer/mplex/multiplex.go @@ -3,8 +3,9 @@ package peerstream_multiplex import ( "net" - mp "github.com/libp2p/go-mplex" // Conn is a connection to a remote peer. - smux "github.com/libp2p/go-stream-muxer" // Conn is a connection to a remote peer. + "github.com/libp2p/go-libp2p-core/mux" + + mp "github.com/libp2p/go-mplex" ) type conn struct { @@ -20,12 +21,12 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (smux.Stream, error) { +func (c *conn) OpenStream() (mux.MuxedStream, error) { return c.Multiplex.NewStream() } // AcceptStream accepts a stream opened by the other side. -func (c *conn) AcceptStream() (smux.Stream, error) { +func (c *conn) AcceptStream() (mux.MuxedStream, error) { return c.Multiplex.Accept() } @@ -36,6 +37,6 @@ type Transport struct{} // DefaultTransport has default settings for multiplex var DefaultTransport = &Transport{} -func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { +func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { return &conn{mp.NewMultiplex(nc, isServer)}, nil } diff --git a/p2p/muxer/mplex/multiplex_test.go b/p2p/muxer/mplex/multiplex_test.go index 2b392e231d..5fd46a1f03 100644 --- a/p2p/muxer/mplex/multiplex_test.go +++ b/p2p/muxer/mplex/multiplex_test.go @@ -3,7 +3,7 @@ package peerstream_multiplex import ( "testing" - test "github.com/libp2p/go-stream-muxer/test" + test "github.com/libp2p/go-libp2p-testing/suites/mux" ) func TestMultiplexTransport(t *testing.T) { From 7773c57d1d99075da7fb0589216a9a1906378e61 Mon Sep 17 00:00:00 2001 From: Hlib Date: Mon, 2 Mar 2020 13:47:11 +0200 Subject: [PATCH 15/24] restructure files layout --- p2p/muxer/mplex/conn.go | 32 ++++++++++++++ p2p/muxer/mplex/multiplex.go | 42 ------------------- p2p/muxer/mplex/transport.go | 22 ++++++++++ .../{multiplex_test.go => transport_test.go} | 2 +- 4 files changed, 55 insertions(+), 43 deletions(-) create mode 100644 p2p/muxer/mplex/conn.go delete mode 100644 p2p/muxer/mplex/multiplex.go create mode 100644 p2p/muxer/mplex/transport.go rename p2p/muxer/mplex/{multiplex_test.go => transport_test.go} (77%) diff --git a/p2p/muxer/mplex/conn.go b/p2p/muxer/mplex/conn.go new file mode 100644 index 0000000000..00f89a04d1 --- /dev/null +++ b/p2p/muxer/mplex/conn.go @@ -0,0 +1,32 @@ +package peerstream_multiplex + +import ( + "github.com/libp2p/go-libp2p-core/mux" + mp "github.com/libp2p/go-mplex" +) + +type conn mp.Multiplex + +func (c *conn) Close() error { + return c.mplex().Close() +} + +func (c *conn) IsClosed() bool { + return c.mplex().IsClosed() +} + +// OpenStream creates a new stream. +func (c *conn) OpenStream() (mux.MuxedStream, error) { + return c.mplex().NewStream() +} + +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (mux.MuxedStream, error) { + return c.mplex().Accept() +} + +func (c *conn) mplex() *mp.Multiplex { + return (*mp.Multiplex)(c) +} + +var _ mux.MuxedConn = &conn{} diff --git a/p2p/muxer/mplex/multiplex.go b/p2p/muxer/mplex/multiplex.go deleted file mode 100644 index bf53c9e7fb..0000000000 --- a/p2p/muxer/mplex/multiplex.go +++ /dev/null @@ -1,42 +0,0 @@ -package peerstream_multiplex - -import ( - "net" - - "github.com/libp2p/go-libp2p-core/mux" - - mp "github.com/libp2p/go-mplex" -) - -type conn struct { - *mp.Multiplex -} - -func (c *conn) Close() error { - return c.Multiplex.Close() -} - -func (c *conn) IsClosed() bool { - return c.Multiplex.IsClosed() -} - -// OpenStream creates a new stream. -func (c *conn) OpenStream() (mux.MuxedStream, error) { - return c.Multiplex.NewStream() -} - -// AcceptStream accepts a stream opened by the other side. -func (c *conn) AcceptStream() (mux.MuxedStream, error) { - return c.Multiplex.Accept() -} - -// Transport is a go-peerstream transport that constructs -// multiplex-backed connections. -type Transport struct{} - -// DefaultTransport has default settings for multiplex -var DefaultTransport = &Transport{} - -func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { - return &conn{mp.NewMultiplex(nc, isServer)}, nil -} diff --git a/p2p/muxer/mplex/transport.go b/p2p/muxer/mplex/transport.go new file mode 100644 index 0000000000..395b855383 --- /dev/null +++ b/p2p/muxer/mplex/transport.go @@ -0,0 +1,22 @@ +package peerstream_multiplex + +import ( + "net" + + "github.com/libp2p/go-libp2p-core/mux" + + mp "github.com/libp2p/go-mplex" +) + +// DefaultTransport has default settings for Transport +var DefaultTransport = &Transport{} + +// Transport implements mux.Multiplexer that constructs +// mplex-backed muxed connections. +type Transport struct{} + +func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { + return (*conn)(mp.NewMultiplex(nc, isServer)), nil +} + +var _ mux.Multiplexer = &Transport{} diff --git a/p2p/muxer/mplex/multiplex_test.go b/p2p/muxer/mplex/transport_test.go similarity index 77% rename from p2p/muxer/mplex/multiplex_test.go rename to p2p/muxer/mplex/transport_test.go index 5fd46a1f03..134fd44454 100644 --- a/p2p/muxer/mplex/multiplex_test.go +++ b/p2p/muxer/mplex/transport_test.go @@ -6,6 +6,6 @@ import ( test "github.com/libp2p/go-libp2p-testing/suites/mux" ) -func TestMultiplexTransport(t *testing.T) { +func TestDefaultTransport(t *testing.T) { test.SubtestAll(t, DefaultTransport) } From 166a2cc8c598749f19af398a5c25741d887274b0 Mon Sep 17 00:00:00 2001 From: Hlib Date: Mon, 2 Mar 2020 13:52:06 +0200 Subject: [PATCH 16/24] define stream type for mp.Stream and update go-mplex --- p2p/muxer/mplex/stream.go | 55 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 p2p/muxer/mplex/stream.go diff --git a/p2p/muxer/mplex/stream.go b/p2p/muxer/mplex/stream.go new file mode 100644 index 0000000000..382183b132 --- /dev/null +++ b/p2p/muxer/mplex/stream.go @@ -0,0 +1,55 @@ +package peerstream_multiplex + +import ( + "time" + + "github.com/libp2p/go-libp2p-core/mux" + mp "github.com/libp2p/go-mplex" +) + +// stream implements mux.MuxedStream over mplex.Stream. +type stream mp.Stream + +func (s *stream) Read(b []byte) (n int, err error) { + n, err = s.mplex().Read(b) + if err == mp.ErrStreamReset { + err = mux.ErrReset + } + + return +} + +func (s *stream) Write(b []byte) (n int, err error) { + n, err = s.mplex().Write(b) + if err == mp.ErrStreamReset { + err = mux.ErrReset + } + + return +} + +func (s *stream) Close() error { + return s.mplex().Close() +} + +func (s *stream) Reset() error { + return s.mplex().Reset() +} + +func (s *stream) SetDeadline(t time.Time) error { + return s.mplex().SetDeadline(t) +} + +func (s *stream) SetReadDeadline(t time.Time) error { + return s.mplex().SetReadDeadline(t) +} + +func (s *stream) SetWriteDeadline(t time.Time) error { + return s.mplex().SetWriteDeadline(t) +} + +func (s *stream) mplex() *mp.Stream { + return (*mp.Stream)(s) +} + +var _ mux.MuxedStream = &stream{} From 8a4d7ec4260e1dee45c460cced2c868fd7f28729 Mon Sep 17 00:00:00 2001 From: Hlib Date: Mon, 2 Mar 2020 14:03:01 +0200 Subject: [PATCH 17/24] use explicit return values --- p2p/muxer/mplex/stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/mplex/stream.go b/p2p/muxer/mplex/stream.go index 382183b132..2708ed3a36 100644 --- a/p2p/muxer/mplex/stream.go +++ b/p2p/muxer/mplex/stream.go @@ -16,7 +16,7 @@ func (s *stream) Read(b []byte) (n int, err error) { err = mux.ErrReset } - return + return n, err } func (s *stream) Write(b []byte) (n int, err error) { @@ -25,7 +25,7 @@ func (s *stream) Write(b []byte) (n int, err error) { err = mux.ErrReset } - return + return n, err } func (s *stream) Close() error { From 4395e603d3c46e09164bfb3389b5f11e7c69e416 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 1 Sep 2020 18:48:04 -0700 Subject: [PATCH 18/24] feat: update stream interfaces --- p2p/muxer/mplex/stream.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/p2p/muxer/mplex/stream.go b/p2p/muxer/mplex/stream.go index 2708ed3a36..6ef57c335c 100644 --- a/p2p/muxer/mplex/stream.go +++ b/p2p/muxer/mplex/stream.go @@ -32,6 +32,14 @@ func (s *stream) Close() error { return s.mplex().Close() } +func (s *stream) CloseWrite() error { + return s.mplex().CloseWrite() +} + +func (s *stream) CloseRead() error { + return s.mplex().CloseRead() +} + func (s *stream) Reset() error { return s.mplex().Reset() } From f4aa2981aa02a05564a5bdebb278311e0d5b331a Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 1 Sep 2020 19:03:31 -0700 Subject: [PATCH 19/24] fix: correctly wrap returned stream --- p2p/muxer/mplex/conn.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/mplex/conn.go b/p2p/muxer/mplex/conn.go index 00f89a04d1..1122e6351c 100644 --- a/p2p/muxer/mplex/conn.go +++ b/p2p/muxer/mplex/conn.go @@ -17,12 +17,20 @@ func (c *conn) IsClosed() bool { // OpenStream creates a new stream. func (c *conn) OpenStream() (mux.MuxedStream, error) { - return c.mplex().NewStream() + s, err := c.mplex().NewStream() + if err != nil { + return nil, err + } + return (*stream)(s), nil } // AcceptStream accepts a stream opened by the other side. func (c *conn) AcceptStream() (mux.MuxedStream, error) { - return c.mplex().Accept() + s, err := c.mplex().Accept() + if err != nil { + return nil, err + } + return (*stream)(s), nil } func (c *conn) mplex() *mp.Multiplex { From c2feedff5c49e72879366505b6d09ae2113c7168 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 17 Dec 2020 16:14:38 +0700 Subject: [PATCH 20/24] change OpenStream to accept a context --- p2p/muxer/mplex/conn.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/p2p/muxer/mplex/conn.go b/p2p/muxer/mplex/conn.go index 1122e6351c..6dd3437648 100644 --- a/p2p/muxer/mplex/conn.go +++ b/p2p/muxer/mplex/conn.go @@ -1,6 +1,8 @@ package peerstream_multiplex import ( + "context" + "github.com/libp2p/go-libp2p-core/mux" mp "github.com/libp2p/go-mplex" ) @@ -16,7 +18,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (mux.MuxedStream, error) { +func (c *conn) OpenStream(context.Context) (mux.MuxedStream, error) { s, err := c.mplex().NewStream() if err != nil { return nil, err From 92ae2a88730139b6b8e52a3f514f745ad3db3e3b Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 19 Dec 2020 15:07:16 +0700 Subject: [PATCH 21/24] update go-mplex, use the context passed to OpenStream --- p2p/muxer/mplex/conn.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/mplex/conn.go b/p2p/muxer/mplex/conn.go index 6dd3437648..e07a72c6c0 100644 --- a/p2p/muxer/mplex/conn.go +++ b/p2p/muxer/mplex/conn.go @@ -18,8 +18,8 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream(context.Context) (mux.MuxedStream, error) { - s, err := c.mplex().NewStream() +func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) { + s, err := c.mplex().NewStream(ctx) if err != nil { return nil, err } From ce30ce884d71977173c00bcb90015141011fa408 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 17 Jan 2022 07:10:37 -0800 Subject: [PATCH 22/24] implement the new network.MuxedConn interface (#29) --- p2p/muxer/mplex/conn.go | 11 ++++++----- p2p/muxer/mplex/stream.go | 13 +++++++------ p2p/muxer/mplex/transport.go | 10 +++++----- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/p2p/muxer/mplex/conn.go b/p2p/muxer/mplex/conn.go index e07a72c6c0..c8bcb4788c 100644 --- a/p2p/muxer/mplex/conn.go +++ b/p2p/muxer/mplex/conn.go @@ -3,12 +3,15 @@ package peerstream_multiplex import ( "context" - "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/network" + mp "github.com/libp2p/go-mplex" ) type conn mp.Multiplex +var _ network.MuxedConn = &conn{} + func (c *conn) Close() error { return c.mplex().Close() } @@ -18,7 +21,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) { +func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) { s, err := c.mplex().NewStream(ctx) if err != nil { return nil, err @@ -27,7 +30,7 @@ func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) { } // AcceptStream accepts a stream opened by the other side. -func (c *conn) AcceptStream() (mux.MuxedStream, error) { +func (c *conn) AcceptStream() (network.MuxedStream, error) { s, err := c.mplex().Accept() if err != nil { return nil, err @@ -38,5 +41,3 @@ func (c *conn) AcceptStream() (mux.MuxedStream, error) { func (c *conn) mplex() *mp.Multiplex { return (*mp.Multiplex)(c) } - -var _ mux.MuxedConn = &conn{} diff --git a/p2p/muxer/mplex/stream.go b/p2p/muxer/mplex/stream.go index 6ef57c335c..0a841824ff 100644 --- a/p2p/muxer/mplex/stream.go +++ b/p2p/muxer/mplex/stream.go @@ -3,17 +3,20 @@ package peerstream_multiplex import ( "time" - "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/network" + mp "github.com/libp2p/go-mplex" ) -// stream implements mux.MuxedStream over mplex.Stream. +// stream implements network.MuxedStream over mplex.Stream. type stream mp.Stream +var _ network.MuxedStream = &stream{} + func (s *stream) Read(b []byte) (n int, err error) { n, err = s.mplex().Read(b) if err == mp.ErrStreamReset { - err = mux.ErrReset + err = network.ErrReset } return n, err @@ -22,7 +25,7 @@ func (s *stream) Read(b []byte) (n int, err error) { func (s *stream) Write(b []byte) (n int, err error) { n, err = s.mplex().Write(b) if err == mp.ErrStreamReset { - err = mux.ErrReset + err = network.ErrReset } return n, err @@ -59,5 +62,3 @@ func (s *stream) SetWriteDeadline(t time.Time) error { func (s *stream) mplex() *mp.Stream { return (*mp.Stream)(s) } - -var _ mux.MuxedStream = &stream{} diff --git a/p2p/muxer/mplex/transport.go b/p2p/muxer/mplex/transport.go index 395b855383..ac08b9b656 100644 --- a/p2p/muxer/mplex/transport.go +++ b/p2p/muxer/mplex/transport.go @@ -3,7 +3,7 @@ package peerstream_multiplex import ( "net" - "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/network" mp "github.com/libp2p/go-mplex" ) @@ -11,12 +11,12 @@ import ( // DefaultTransport has default settings for Transport var DefaultTransport = &Transport{} +var _ network.Multiplexer = &Transport{} + // Transport implements mux.Multiplexer that constructs // mplex-backed muxed connections. type Transport struct{} -func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { - return (*conn)(mp.NewMultiplex(nc, isServer)), nil +func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) { + return (*conn)(mp.NewMultiplex(nc, isServer, scope)), nil } - -var _ mux.Multiplexer = &Transport{} From d6fc4be820eb529112390d66d896384049d3d046 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 4 Mar 2022 10:56:56 +0400 Subject: [PATCH 23/24] update mplex (#31) --- p2p/muxer/mplex/transport.go | 6 ++++- p2p/muxer/mplex/transport_test.go | 41 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/p2p/muxer/mplex/transport.go b/p2p/muxer/mplex/transport.go index ac08b9b656..8f9e33a997 100644 --- a/p2p/muxer/mplex/transport.go +++ b/p2p/muxer/mplex/transport.go @@ -18,5 +18,9 @@ var _ network.Multiplexer = &Transport{} type Transport struct{} func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) { - return (*conn)(mp.NewMultiplex(nc, isServer, scope)), nil + m, err := mp.NewMultiplex(nc, isServer, scope) + if err != nil { + return nil, err + } + return (*conn)(m), nil } diff --git a/p2p/muxer/mplex/transport_test.go b/p2p/muxer/mplex/transport_test.go index 134fd44454..c224667c2d 100644 --- a/p2p/muxer/mplex/transport_test.go +++ b/p2p/muxer/mplex/transport_test.go @@ -1,11 +1,52 @@ package peerstream_multiplex import ( + "errors" + "net" "testing" + "github.com/libp2p/go-libp2p-core/network" test "github.com/libp2p/go-libp2p-testing/suites/mux" ) func TestDefaultTransport(t *testing.T) { test.SubtestAll(t, DefaultTransport) } + +type memoryScope struct { + network.PeerScope + limit int + reserved int +} + +func (m *memoryScope) ReserveMemory(size int, prio uint8) error { + if m.reserved+size > m.limit { + return errors.New("too much") + } + m.reserved += size + return nil +} + +func (m *memoryScope) ReleaseMemory(size int) { + m.reserved -= size + if m.reserved < 0 { + panic("too much memory released") + } +} + +type memoryLimitedTransport struct { + Transport +} + +func (t *memoryLimitedTransport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) { + return t.Transport.NewConn(nc, isServer, &memoryScope{ + limit: 3 * 1 << 20, + PeerScope: scope, + }) +} + +func TestDefaultTransportWithMemoryLimit(t *testing.T) { + test.SubtestAll(t, &memoryLimitedTransport{ + Transport: *DefaultTransport, + }) +} From 663cf70bc31b5ba05074955844cb11fe2e1e8525 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 26 Apr 2022 13:08:05 +0100 Subject: [PATCH 24/24] rename the mplex package to mplex --- go.mod | 1 + go.sum | 3 ++- p2p/muxer/mplex/conn.go | 2 +- p2p/muxer/mplex/stream.go | 2 +- p2p/muxer/mplex/transport.go | 2 +- p2p/muxer/mplex/transport_test.go | 2 +- 6 files changed, 7 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index fed40f9e17..49d4fae83c 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/libp2p/go-libp2p-testing v0.9.2 github.com/libp2p/go-libp2p-tls v0.4.1 github.com/libp2p/go-libp2p-transport-upgrader v0.7.1 + github.com/libp2p/go-mplex v0.7.0 github.com/libp2p/go-msgio v0.2.0 github.com/libp2p/go-netroute v0.2.0 github.com/libp2p/go-reuseport v0.1.0 diff --git a/go.sum b/go.sum index d47bff0421..3de3c758a2 100644 --- a/go.sum +++ b/go.sum @@ -486,8 +486,9 @@ github.com/libp2p/go-libp2p-yamux v0.9.1 h1:oplewiRix8s45SOrI30rCPZG5mM087YZp+VY github.com/libp2p/go-libp2p-yamux v0.9.1/go.mod h1:wRc6wvyxQINFcKe7daL4BeQ02Iyp+wxyC8WCNfngBrA= github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU= github.com/libp2p/go-mplex v0.3.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ= -github.com/libp2p/go-mplex v0.4.0 h1:Ukkez9/4EOX5rTw4sHefNJp10dksftAA05ZgyjplUbM= github.com/libp2p/go-mplex v0.4.0/go.mod h1:y26Lx+wNVtMYMaPu300Cbot5LkEZ4tJaNYeHeT9dh6E= +github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY= +github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU= github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-msgio v0.0.6/go.mod h1:4ecVB6d9f4BDSL5fqvPiC4A3KivjWn+Venn/1ALLMWA= github.com/libp2p/go-msgio v0.2.0 h1:W6shmB+FeynDrUVl2dgFQvzfBZcXiyqY4VmpQLu9FqU= diff --git a/p2p/muxer/mplex/conn.go b/p2p/muxer/mplex/conn.go index c8bcb4788c..9c681d162a 100644 --- a/p2p/muxer/mplex/conn.go +++ b/p2p/muxer/mplex/conn.go @@ -1,4 +1,4 @@ -package peerstream_multiplex +package mplex import ( "context" diff --git a/p2p/muxer/mplex/stream.go b/p2p/muxer/mplex/stream.go index 0a841824ff..64b70fbde9 100644 --- a/p2p/muxer/mplex/stream.go +++ b/p2p/muxer/mplex/stream.go @@ -1,4 +1,4 @@ -package peerstream_multiplex +package mplex import ( "time" diff --git a/p2p/muxer/mplex/transport.go b/p2p/muxer/mplex/transport.go index 8f9e33a997..f4f0b21955 100644 --- a/p2p/muxer/mplex/transport.go +++ b/p2p/muxer/mplex/transport.go @@ -1,4 +1,4 @@ -package peerstream_multiplex +package mplex import ( "net" diff --git a/p2p/muxer/mplex/transport_test.go b/p2p/muxer/mplex/transport_test.go index c224667c2d..d447452624 100644 --- a/p2p/muxer/mplex/transport_test.go +++ b/p2p/muxer/mplex/transport_test.go @@ -1,4 +1,4 @@ -package peerstream_multiplex +package mplex import ( "errors"