Skip to content

Commit

Permalink
swarm/connection: Enforce limit on inbound substreams via `StreamMuxe…
Browse files Browse the repository at this point in the history
…r` (#2861)

* Provide separate functions for injecting in- and outbound streams

* Inline `HandlerWrapper` into `Connection`

* Only poll for new inbound streams if we are below the limit

* yamux: Buffer inbound streams in `StreamMuxer::poll`
  • Loading branch information
thomaseizinger authored Sep 21, 2022
1 parent f4362cb commit a4d1e58
Show file tree
Hide file tree
Showing 8 changed files with 698 additions and 600 deletions.
4 changes: 4 additions & 0 deletions muxers/yamux/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

- Remove `OpenSubstreamToken` as it is dead code. See [PR 2873].

- Drive connection also via `StreamMuxer::poll`. Any received streams will be buffered up to a maximum of 25 streams.
See [PR 2861].

[PR 2873]: https://github.com/libp2p/rust-libp2p/pull/2873/
[PR 2861]: https://github.com/libp2p/rust-libp2p/pull/2861/

# 0.39.0

Expand Down
1 change: 1 addition & 0 deletions muxers/yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ libp2p-core = { version = "0.36.0", path = "../../core", default-features = fals
parking_lot = "0.12"
thiserror = "1.0"
yamux = "0.10.0"
log = "0.4"
57 changes: 49 additions & 8 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
use futures::{
future,
prelude::*,
ready,
stream::{BoxStream, LocalBoxStream},
};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::collections::VecDeque;
use std::{
fmt, io, iter, mem,
pin::Pin,
Expand All @@ -42,8 +44,20 @@ pub struct Yamux<S> {
incoming: S,
/// Handle to control the connection.
control: yamux::Control,
/// Temporarily buffers inbound streams in case our node is performing backpressure on the remote.
///
/// The only way how yamux can make progress is by driving the [`Incoming`] stream. However, the
/// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via
/// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general
/// [`StreamMuxer::poll`] is designed to make progress on existing streams etc.
///
/// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called.
/// Once the buffer is full, new inbound streams are dropped.
inbound_stream_buffer: VecDeque<yamux::Stream>,
}

const MAX_BUFFERED_INBOUND_STREAMS: usize = 25;

impl<S> fmt::Debug for Yamux<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Yamux")
Expand All @@ -65,6 +79,7 @@ where
_marker: std::marker::PhantomData,
},
control: ctrl,
inbound_stream_buffer: VecDeque::default(),
}
}
}
Expand All @@ -84,6 +99,7 @@ where
_marker: std::marker::PhantomData,
},
control: ctrl,
inbound_stream_buffer: VecDeque::default(),
}
}
}
Expand All @@ -101,13 +117,11 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.incoming.poll_next_unpin(cx).map(|maybe_stream| {
let stream = maybe_stream
.transpose()?
.ok_or(YamuxError(ConnectionError::Closed))?;
if let Some(stream) = self.inbound_stream_buffer.pop_front() {
return Poll::Ready(Ok(stream));
}

Ok(stream)
})
self.poll_inner(cx)
}

fn poll_outbound(
Expand All @@ -121,9 +135,21 @@ where

fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
let this = self.get_mut();

loop {
let inbound_stream = ready!(this.poll_inner(cx))?;

if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS {
log::warn!("dropping {inbound_stream} because buffer is full");
drop(inbound_stream);
continue;
}

this.inbound_stream_buffer.push_back(inbound_stream);
}
}

fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll<YamuxResult<()>> {
Expand All @@ -145,6 +171,21 @@ where
}
}

impl<S> Yamux<S>
where
S: Stream<Item = Result<yamux::Stream, YamuxError>> + Unpin,
{
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<yamux::Stream, YamuxError>> {
self.incoming.poll_next_unpin(cx).map(|maybe_stream| {
let stream = maybe_stream
.transpose()?
.ok_or(YamuxError(ConnectionError::Closed))?;

Ok(stream)
})
}
}

/// The yamux configuration.
#[derive(Debug, Clone)]
pub struct YamuxConfig {
Expand Down
5 changes: 5 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

- Update to `libp2p-core` `v0.36.0`.

- Enforce backpressure on incoming streams via `StreamMuxer` interface. In case we hit the configured limit of maximum
number of inbound streams, we will stop polling the `StreamMuxer` for new inbound streams. Depending on the muxer
implementation in use, this may lead to instant dropping of inbound streams. See [PR 2861].

[libp2p-swarm v0.38.0 changelog entry]: https://github.com/libp2p/rust-libp2p/blob/master/swarm/CHANGELOG.md#0380
[PR 2861]: https://github.com/libp2p/rust-libp2p/pull/2861/

# 0.38.0

Expand Down
Loading

0 comments on commit a4d1e58

Please sign in to comment.