Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

muxers/mplex: Implement AsyncRead and AsyncWrite for Substream #2706

Merged
merged 7 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
# 0.46.0 [unreleased]
- Semver bump Rust from `1.56.1` to `1.60.0` . See [PR 2646].
- Added weak dependencies for features. See [PR 2646].
- Update individual crates.
- Update to [`libp2p-mplex` `v0.34.0`](muxers/mplex/CHANGELOG.md).

[PR 2646]: https://github.com/libp2p/rust-libp2p/pull/2646

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ libp2p-floodsub = { version = "0.36.0", path = "protocols/floodsub", optional =
libp2p-identify = { version = "0.36.1", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.37.1", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.6.0", path = "misc/metrics", optional = true }
libp2p-mplex = { version = "0.33.0", path = "muxers/mplex", optional = true }
libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.36.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.36.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.33.0", path = "transports/plaintext", optional = true }
Expand Down
6 changes: 6 additions & 0 deletions muxers/mplex/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 0.34.0 [unreleased]

- `Substream` now implements `AsyncRead` and `AsyncWrite`. See [PR 2706].

[PR 2706]: https://github.com/libp2p/rust-libp2p/pull/2706/

# 0.33.0

- Update to `libp2p-core` `v0.33.0`.
Expand Down
2 changes: 1 addition & 1 deletion muxers/mplex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-mplex"
edition = "2021"
rust-version = "1.56.1"
description = "Mplex multiplexing protocol for libp2p"
version = "0.33.0"
version = "0.34.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version = "0.34.0"
version = "0.33.1"

Is this a breaking change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes because Substream is a public type and has type parameter now!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point. Thanks!

authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
124 changes: 94 additions & 30 deletions muxers/mplex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use libp2p_core::{
StreamMuxer,
};
use parking_lot::Mutex;
use std::{cmp, iter, task::Context, task::Poll};
use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll};

impl UpgradeInfo for MplexConfig {
type Info = &'static [u8];
Expand All @@ -54,7 +54,7 @@ where

fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Multiplex {
io: Mutex::new(io::Multiplexed::new(socket, self)),
io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
}))
}
}
Expand All @@ -69,7 +69,7 @@ where

fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Multiplex {
io: Mutex::new(io::Multiplexed::new(socket, self)),
io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
}))
}
}
Expand All @@ -79,14 +79,14 @@ where
/// This implementation isn't capable of detecting when the underlying socket changes its address,
/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted.
pub struct Multiplex<C> {
io: Mutex<io::Multiplexed<C>>,
io: Arc<Mutex<io::Multiplexed<C>>>,
}

impl<C> StreamMuxer for Multiplex<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
type Substream = Substream;
type Substream = Substream<C>;
type OutboundSubstream = OutboundSubstream;
type Error = io::Error;

Expand All @@ -95,7 +95,7 @@ where
cx: &mut Context<'_>,
) -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>> {
let stream_id = ready!(self.io.lock().poll_next_stream(cx))?;
let stream = Substream::new(stream_id);
let stream = Substream::new(stream_id, self.io.clone());
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream)))
}

Expand All @@ -109,7 +109,7 @@ where
_: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, io::Error>> {
let stream_id = ready!(self.io.lock().poll_open_stream(cx))?;
Poll::Ready(Ok(Substream::new(stream_id)))
Poll::Ready(Ok(Substream::new(stream_id, self.io.clone())))
}

fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
Expand All @@ -122,22 +122,7 @@ where
substream: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
loop {
// Try to read from the current (i.e. last received) frame.
if !substream.current_data.is_empty() {
let len = cmp::min(substream.current_data.len(), buf.len());
buf[..len].copy_from_slice(&substream.current_data.split_to(len));
return Poll::Ready(Ok(len));
}

// Read the next data frame from the multiplexed stream.
match ready!(self.io.lock().poll_read_stream(cx, substream.id))? {
Some(data) => {
substream.current_data = data;
}
None => return Poll::Ready(Ok(0)),
}
}
Pin::new(substream).poll_read(cx, buf)
}

fn write_substream(
Expand All @@ -146,27 +131,27 @@ where
substream: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.io.lock().poll_write_stream(cx, substream.id, buf)
Pin::new(substream).poll_write(cx, buf)
}

fn flush_substream(
&self,
cx: &mut Context<'_>,
substream: &mut Self::Substream,
) -> Poll<Result<(), io::Error>> {
self.io.lock().poll_flush_stream(cx, substream.id)
Pin::new(substream).poll_flush(cx)
}

fn shutdown_substream(
&self,
cx: &mut Context<'_>,
substream: &mut Self::Substream,
) -> Poll<Result<(), io::Error>> {
self.io.lock().poll_close_stream(cx, substream.id)
Pin::new(substream).poll_close(cx)
}

fn destroy_substream(&self, sub: Self::Substream) {
self.io.lock().drop_stream(sub.id);
std::mem::drop(sub)
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Expand All @@ -177,19 +162,98 @@ where
/// Active attempt to open an outbound substream.
pub struct OutboundSubstream {}

impl<C> AsyncRead for Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();

loop {
// Try to read from the current (i.e. last received) frame.
if !this.current_data.is_empty() {
let len = cmp::min(this.current_data.len(), buf.len());
buf[..len].copy_from_slice(&this.current_data.split_to(len));
return Poll::Ready(Ok(len));
}

// Read the next data frame from the multiplexed stream.
match ready!(this.io.lock().poll_read_stream(cx, this.id))? {
Some(data) => {
this.current_data = data;
}
None => return Poll::Ready(Ok(0)),
}
}
}
}

impl<C> AsyncWrite for Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();

this.io.lock().poll_write_stream(cx, this.id, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();

this.io.lock().poll_flush_stream(cx, this.id)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();
let mut io = this.io.lock();

ready!(io.poll_close_stream(cx, this.id))?;
ready!(io.poll_flush_stream(cx, this.id))?;

Poll::Ready(Ok(()))
}
}

/// Active substream to the remote.
pub struct Substream {
pub struct Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
/// The unique, local identifier of the substream.
id: LocalStreamId,
/// The current data frame the substream is reading from.
current_data: Bytes,
/// Shared reference to the actual muxer.
io: Arc<Mutex<io::Multiplexed<C>>>,
}

impl Substream {
fn new(id: LocalStreamId) -> Self {
impl<C> Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn new(id: LocalStreamId, io: Arc<Mutex<io::Multiplexed<C>>>) -> Self {
Self {
id,
current_data: Bytes::new(),
io,
}
}
}

impl<C> Drop for Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn drop(&mut self) {
self.io.lock().drop_stream(self.id);
}
}