Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move go-libp2p-mplex here #1450

Merged
merged 30 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3023356
extracted from go-peerstream
jbenet Jul 12, 2015
5b2626d
fixed urls
jbenet Jul 12, 2015
73c3d70
links fix
jbenet Jul 12, 2015
aab33b0
implement AcceptStream
jbenet Jul 12, 2015
3573177
update multistream and multiplex
whyrusleeping Jul 14, 2015
9b37fe1
gxify and update deps
whyrusleeping May 2, 2016
9a45ffb
rewrite paths to gx versions
whyrusleeping May 4, 2016
b3902ad
gx publish version 1.0.0
whyrusleeping May 13, 2016
ac8a308
update multiplex to 0.1.0
whyrusleeping Nov 22, 2016
43db8f2
add support for the new Reset method.
Stebalien Aug 18, 2017
17e524c
Merge pull request #4 from Stebalien/feat/reset
Stebalien Aug 18, 2017
a920757
remove conn.Serve
marten-seemann Aug 19, 2017
4ca0f79
Merge pull request #5 from marten-seemann/remove-serve
Stebalien Aug 24, 2017
a589640
gx publish 3.0.11
Stebalien Jun 26, 2018
fad7d9b
switch to the correct mplex package
Stebalien Sep 25, 2018
aa541dd
migrate to consolidated types; add travis config (#1)
raulk May 25, 2019
7773c57
restructure files layout
Wondertan Mar 2, 2020
166a2cc
define stream type for mp.Stream and update go-mplex
Wondertan Mar 2, 2020
8a4d7ec
use explicit return values
Wondertan Mar 2, 2020
4395e60
feat: update stream interfaces
Stebalien Sep 2, 2020
f4aa298
fix: correctly wrap returned stream
Stebalien Sep 2, 2020
fdb9a0a
Merge pull request #20 from libp2p/feat/stream-interfaces
Stebalien Sep 2, 2020
c2feedf
change OpenStream to accept a context
marten-seemann Dec 17, 2020
6c57248
Merge pull request #21 from libp2p/open-stream-context
marten-seemann Dec 19, 2020
92ae2a8
update go-mplex, use the context passed to OpenStream
marten-seemann Dec 19, 2020
c04213f
Merge pull request #23 from libp2p/update-mplex
marten-seemann Dec 19, 2020
ce30ce8
implement the new network.MuxedConn interface (#29)
marten-seemann Jan 17, 2022
d6fc4be
update mplex (#31)
marten-seemann Mar 4, 2022
d69f1fc
move go-libp2p-mplex here
marten-seemann Apr 26, 2022
663cf70
rename the mplex package to mplex
marten-seemann Apr 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/libp2p/go-libp2p-testing v0.9.2
github.com/libp2p/go-libp2p-tls v0.4.1
github.com/libp2p/go-libp2p-transport-upgrader v0.7.1
github.com/libp2p/go-mplex v0.7.0
github.com/libp2p/go-msgio v0.2.0
github.com/libp2p/go-netroute v0.2.0
github.com/libp2p/go-reuseport v0.1.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,9 @@ github.com/libp2p/go-libp2p-yamux v0.9.1 h1:oplewiRix8s45SOrI30rCPZG5mM087YZp+VY
github.com/libp2p/go-libp2p-yamux v0.9.1/go.mod h1:wRc6wvyxQINFcKe7daL4BeQ02Iyp+wxyC8WCNfngBrA=
github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU=
github.com/libp2p/go-mplex v0.3.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ=
github.com/libp2p/go-mplex v0.4.0 h1:Ukkez9/4EOX5rTw4sHefNJp10dksftAA05ZgyjplUbM=
github.com/libp2p/go-mplex v0.4.0/go.mod h1:y26Lx+wNVtMYMaPu300Cbot5LkEZ4tJaNYeHeT9dh6E=
github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY=
github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU=
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-msgio v0.0.6/go.mod h1:4ecVB6d9f4BDSL5fqvPiC4A3KivjWn+Venn/1ALLMWA=
github.com/libp2p/go-msgio v0.2.0 h1:W6shmB+FeynDrUVl2dgFQvzfBZcXiyqY4VmpQLu9FqU=
Expand Down
43 changes: 43 additions & 0 deletions p2p/muxer/mplex/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package mplex

import (
"context"

"github.com/libp2p/go-libp2p-core/network"

mp "github.com/libp2p/go-mplex"
)

type conn mp.Multiplex

var _ network.MuxedConn = &conn{}

func (c *conn) Close() error {
return c.mplex().Close()
}

func (c *conn) IsClosed() bool {
return c.mplex().IsClosed()
}

// OpenStream creates a new stream.
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
s, err := c.mplex().NewStream(ctx)
if err != nil {
return nil, err
}
return (*stream)(s), nil
}

// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (network.MuxedStream, error) {
s, err := c.mplex().Accept()
if err != nil {
return nil, err
}
return (*stream)(s), nil
}

func (c *conn) mplex() *mp.Multiplex {
return (*mp.Multiplex)(c)
}
64 changes: 64 additions & 0 deletions p2p/muxer/mplex/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package mplex

import (
"time"

"github.com/libp2p/go-libp2p-core/network"

mp "github.com/libp2p/go-mplex"
)

// stream implements network.MuxedStream over mplex.Stream.
type stream mp.Stream

var _ network.MuxedStream = &stream{}

func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.mplex().Read(b)
if err == mp.ErrStreamReset {
err = network.ErrReset
}

return n, err
}

func (s *stream) Write(b []byte) (n int, err error) {
n, err = s.mplex().Write(b)
if err == mp.ErrStreamReset {
err = network.ErrReset
}

return n, err
}

func (s *stream) Close() error {
return s.mplex().Close()
}

func (s *stream) CloseWrite() error {
return s.mplex().CloseWrite()
}

func (s *stream) CloseRead() error {
return s.mplex().CloseRead()
}

func (s *stream) Reset() error {
return s.mplex().Reset()
}

func (s *stream) SetDeadline(t time.Time) error {
return s.mplex().SetDeadline(t)
}

func (s *stream) SetReadDeadline(t time.Time) error {
return s.mplex().SetReadDeadline(t)
}

func (s *stream) SetWriteDeadline(t time.Time) error {
return s.mplex().SetWriteDeadline(t)
}

func (s *stream) mplex() *mp.Stream {
return (*mp.Stream)(s)
}
26 changes: 26 additions & 0 deletions p2p/muxer/mplex/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package mplex

import (
"net"

"github.com/libp2p/go-libp2p-core/network"

mp "github.com/libp2p/go-mplex"
)

// DefaultTransport has default settings for Transport
var DefaultTransport = &Transport{}

var _ network.Multiplexer = &Transport{}

// Transport implements mux.Multiplexer that constructs
// mplex-backed muxed connections.
type Transport struct{}

func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) {
m, err := mp.NewMultiplex(nc, isServer, scope)
if err != nil {
return nil, err
}
return (*conn)(m), nil
}
52 changes: 52 additions & 0 deletions p2p/muxer/mplex/transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package mplex

import (
"errors"
"net"
"testing"

"github.com/libp2p/go-libp2p-core/network"
test "github.com/libp2p/go-libp2p-testing/suites/mux"
)

func TestDefaultTransport(t *testing.T) {
test.SubtestAll(t, DefaultTransport)
}

type memoryScope struct {
network.PeerScope
limit int
reserved int
}

func (m *memoryScope) ReserveMemory(size int, prio uint8) error {
if m.reserved+size > m.limit {
return errors.New("too much")
}
m.reserved += size
return nil
}

func (m *memoryScope) ReleaseMemory(size int) {
m.reserved -= size
if m.reserved < 0 {
panic("too much memory released")
}
}

type memoryLimitedTransport struct {
Transport
}

func (t *memoryLimitedTransport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) {
return t.Transport.NewConn(nc, isServer, &memoryScope{
limit: 3 * 1 << 20,
PeerScope: scope,
})
}

func TestDefaultTransportWithMemoryLimit(t *testing.T) {
test.SubtestAll(t, &memoryLimitedTransport{
Transport: *DefaultTransport,
})
}