Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/connection: Enforce limit on inbound substreams via StreamMuxer #2861

Merged
merged 46 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
79ce63f
Provide separate functions for injecting in- and outbound streams
thomaseizinger Aug 31, 2022
b829642
Inline `HandlerWrapper` into `Connection`
thomaseizinger Aug 31, 2022
7182fd6
Remove unique_dial_upgrade_id
thomaseizinger Aug 31, 2022
7190a79
Don't use tuples when we can use parameters
thomaseizinger Aug 31, 2022
f689617
Delay de-construction of SubstreamUpgrade
thomaseizinger Aug 31, 2022
4d9fa5b
Only poll for new inbound streams if we are below the limit
thomaseizinger Aug 31, 2022
1af9ba3
Inline `inject_substream` functions
thomaseizinger Aug 31, 2022
27274c8
Introduce ctor functions on SubstreamUpgrade to simplify poll-fn
thomaseizinger Aug 31, 2022
2c3a606
Remove stale comment
thomaseizinger Aug 31, 2022
772b775
Remove empty line
thomaseizinger Aug 31, 2022
6359137
Align naming
thomaseizinger Aug 31, 2022
bb930c4
Re-introduce log
thomaseizinger Aug 31, 2022
d8d8165
Bring back the test
thomaseizinger Aug 31, 2022
a98415f
Simplify `Connection` constructor
thomaseizinger Aug 31, 2022
bbf7070
Use `match` to reduce indentation level
thomaseizinger Aug 31, 2022
8fea3a7
Fix rustdoc links
thomaseizinger Aug 31, 2022
65a06d2
Replace `if let` with `match` to reduce indentation
thomaseizinger Aug 31, 2022
91370a9
Reduce diff
thomaseizinger Aug 31, 2022
0621961
Reduce diff, 2nd attempt
thomaseizinger Aug 31, 2022
9197de0
Reduce diff, 3nd attempt
thomaseizinger Aug 31, 2022
82d3bd3
Don't use dial terminology for substreams
thomaseizinger Sep 5, 2022
e677455
Fix typo
thomaseizinger Sep 5, 2022
6b98c50
yamux: Buffer inbound streams in `StreamMuxer::poll`
thomaseizinger Sep 5, 2022
890cecd
Fix compile error
thomaseizinger Sep 5, 2022
44e9163
Fix autonat tests
thomaseizinger Sep 8, 2022
649021c
Fix rustdoc link
thomaseizinger Sep 8, 2022
9ca11b0
Fix silly end-less loop mistake and add log
thomaseizinger Sep 15, 2022
34ef653
Revert "Fix autonat tests"
thomaseizinger Sep 15, 2022
44c2e36
Remove accidential log init
thomaseizinger Sep 15, 2022
0ec5a58
Remove import
thomaseizinger Sep 15, 2022
1ff710d
Have timer start on request of substream for outbound streams
thomaseizinger Sep 15, 2022
ce4d9a7
Add stub for missing test
thomaseizinger Sep 15, 2022
e0ba9ba
Implement missing test and add waker to SubstreamRequested
thomaseizinger Sep 15, 2022
9ea9ad8
Add docs
thomaseizinger Sep 15, 2022
a2f220a
Improve panic message
thomaseizinger Sep 15, 2022
e7cfb46
Add docs
thomaseizinger Sep 15, 2022
ee686ed
Ensure no shutdown as long as we want to open new streams
thomaseizinger Sep 15, 2022
9bfa0cb
Bump libp2p-yamux version
thomaseizinger Sep 15, 2022
ee61244
Add changelog entry to libp2p-swarm
thomaseizinger Sep 15, 2022
dcedb4e
Update muxers/yamux/src/lib.rs
thomaseizinger Sep 19, 2022
36f69ea
Merge branch 'master' into inline-handler-wrapper
thomaseizinger Sep 19, 2022
a2c2502
Merge branch 'inline-handler-wrapper' of github.com:libp2p/rust-libp2…
thomaseizinger Sep 19, 2022
a25abeb
Update Cargo.toml
thomaseizinger Sep 19, 2022
b8ad471
Update muxers/yamux/Cargo.toml
thomaseizinger Sep 19, 2022
0cb8a87
Fmt
thomaseizinger Sep 19, 2022
1943c66
Merge branch 'master' into inline-handler-wrapper
mxinden Sep 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ libp2p-swarm = { version = "0.39.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.30.0", path = "swarm-derive" }
libp2p-uds = { version = "0.34.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.35.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.39.0", path = "muxers/yamux", optional = true }
libp2p-yamux = { version = "0.39.1", path = "muxers/yamux", optional = true }
multiaddr = { version = "0.14.0" }
parking_lot = "0.12.0"
pin-project = "1.0.0"
Expand Down
7 changes: 7 additions & 0 deletions muxers/yamux/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# 0.39.1

- Drive connection also via `StreamMuxer::poll`. Any received streams will be buffered up to a maximum of 25 streams.
See [PR 2861].

[PR 2861]: https://github.com/libp2p/rust-libp2p/pull/2861/

# 0.39.0

- Update to `libp2p-core` `v0.35.0`
Expand Down
3 changes: 2 additions & 1 deletion muxers/yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-yamux"
edition = "2021"
rust-version = "1.56.1"
description = "Yamux multiplexing protocol for libp2p"
version = "0.39.0"
version = "0.39.1"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -16,3 +16,4 @@ libp2p-core = { version = "0.35.0", path = "../../core", default-features = fals
parking_lot = "0.12"
thiserror = "1.0"
yamux = "0.10.0"
log = "0.4"
57 changes: 49 additions & 8 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
use futures::{
future,
prelude::*,
ready,
stream::{BoxStream, LocalBoxStream},
};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::collections::VecDeque;
use std::{
fmt, io, iter, mem,
pin::Pin,
Expand All @@ -42,8 +44,20 @@ pub struct Yamux<S> {
incoming: S,
/// Handle to control the connection.
control: yamux::Control,
/// Temporarily buffers inbound streams in case our node is performing backpressure on the remote.
///
/// The only way how yamux can make progress is by driving the [`Incoming`] stream. However, the
/// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via
/// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the move general
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
/// [`StreamMuxer::poll`] is designed to make progress on existing streams etc.
///
/// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called.
/// Once the buffer is full, new inbound streams are dropped.
inbound_stream_buffer: VecDeque<yamux::Stream>,
}

const MAX_BUFFERED_INBOUND_STREAMS: usize = 25;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not mistaken, the additional buffer of 25 streams increases the overall number of possible inflight inbound streams, correct? In other words, while we previously only supported max_negotiating_inbound_streams before dropping streams, we now allow up to max_negotiating_inbound_streams + 25.

Is this correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that sounds correct to me! Like I said, this number is completely arbitrary and we can change it to something else :)


impl<S> fmt::Debug for Yamux<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Yamux")
Expand All @@ -69,6 +83,7 @@ where
_marker: std::marker::PhantomData,
},
control: ctrl,
inbound_stream_buffer: VecDeque::default(),
}
}
}
Expand All @@ -88,6 +103,7 @@ where
_marker: std::marker::PhantomData,
},
control: ctrl,
inbound_stream_buffer: VecDeque::default(),
}
}
}
Expand All @@ -105,13 +121,11 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.incoming.poll_next_unpin(cx).map(|maybe_stream| {
let stream = maybe_stream
.transpose()?
.ok_or(YamuxError(ConnectionError::Closed))?;
if let Some(stream) = self.inbound_stream_buffer.pop_front() {
return Poll::Ready(Ok(stream));
}

Ok(stream)
})
self.poll_inner(cx)
}

fn poll_outbound(
Expand All @@ -125,9 +139,21 @@ where

fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
let this = self.get_mut();

loop {
let inbound_stream = ready!(this.poll_inner(cx))?;

if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS {
log::warn!("dropping {inbound_stream} because buffer is full");
drop(inbound_stream);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to make sure we are making an informed decision here. Instead of tail dropping, we could as well do head dropping.

I would have to put more thoughts into this before having an opinion. This does maintain the status quo, thus we can consider it a non-change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, the timeout for opening a new stream already begins as the request is queued. I believe head dropping would negatively affect the number of successfully negotiated streams because we might drop a stream that was about to be processed by the local node. A dropped stream will trigger a ConnectionHandler::inject_dial_upgrade_error with the provided "open info" on the remote node.

In a scenario where a node opens 3 streams that are all pending in this buffer, I think it may be confusing if stream 1 fails but 2 and 3 succeed. It feels more natural to process this in FIFO order.

continue;
}

this.inbound_stream_buffer.push_back(inbound_stream);
}
}

fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll<YamuxResult<()>> {
Expand All @@ -149,6 +175,21 @@ where
}
}

impl<S> Yamux<S>
where
S: Stream<Item = Result<yamux::Stream, YamuxError>> + Unpin,
{
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<yamux::Stream, YamuxError>> {
self.incoming.poll_next_unpin(cx).map(|maybe_stream| {
let stream = maybe_stream
.transpose()?
.ok_or(YamuxError(ConnectionError::Closed))?;

Ok(stream)
})
}
}

/// The yamux configuration.
#[derive(Debug, Clone)]
pub struct YamuxConfig {
Expand Down
5 changes: 5 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
- Remove deprecated `NetworkBehaviourEventProcess`. See [libp2p-swarm v0.38.0 changelog entry] for
migration path.

- Enforce backpressure on incoming streams via `StreamMuxer` interface. In case we hit the configured limit of maximum
number of inbound streams, we will stop polling the `StreamMuxer` for new inbound streams. Depending on the muxer
implementation in use, this may lead to instant dropping of inbound streams. See [PR 2861].

[libp2p-swarm v0.38.0 changelog entry]: https://github.com/libp2p/rust-libp2p/blob/master/swarm/CHANGELOG.md#0380
[PR 2861]: https://github.com/libp2p/rust-libp2p/pull/2861/

# 0.38.0

Expand Down
Loading