Skip to content

Commit

Permalink
chore: remove WindowUpdateMode::OnReceive (#179)
Browse files Browse the repository at this point in the history
Follow up to previous deprecation #177.

Fixes #175.
  • Loading branch information
mxinden authored Nov 27, 2023
1 parent 1e46505 commit 16ffe54
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 120 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 0.13.0

- Remove `WindowUpdateMode`.
Behavior will always be `WindowUpdateMode::OnRead`, thus enabling flow-control and enforcing backpressure.
See [PR 178](https://github.com/libp2p/rust-yamux/pull/178).

# 0.12.1

- Deprecate `WindowUpdateMode::OnReceive`.
Expand Down
10 changes: 2 additions & 8 deletions test-harness/benches/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,15 @@ fn concurrent(c: &mut Criterion) {
group.finish();
}

fn config() -> Config {
let mut c = Config::default();
c.set_window_update_mode(yamux::WindowUpdateMode::OnRead);
c
}

async fn oneway(
nstreams: usize,
nmessages: usize,
data: Bytes,
server: Endpoint,
client: Endpoint,
) {
let server = Connection::new(server, config(), Mode::Server);
let client = Connection::new(client, config(), Mode::Client);
let server = Connection::new(server, Config::default(), Mode::Server);
let client = Connection::new(client, Config::default(), Mode::Client);

task::spawn(dev_null_server(server));

Expand Down
8 changes: 1 addition & 7 deletions test-harness/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::{fmt, io, mem};
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::task;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use yamux::Config;
use yamux::ConnectionError;
use yamux::{Config, WindowUpdateMode};
use yamux::{Connection, Mode};

pub async fn connected_peers(
Expand Down Expand Up @@ -448,12 +448,6 @@ pub struct TestConfig(pub Config);
impl Arbitrary for TestConfig {
fn arbitrary(g: &mut Gen) -> Self {
let mut c = Config::default();
c.set_window_update_mode(if bool::arbitrary(g) {
WindowUpdateMode::OnRead
} else {
#[allow(deprecated)]
WindowUpdateMode::OnReceive
});
c.set_read_after_close(Arbitrary::arbitrary(g));
c.set_receive_window(256 * 1024 + u32::arbitrary(g) % (768 * 1024));
TestConfig(c)
Expand Down
4 changes: 2 additions & 2 deletions yamux/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yamux"
version = "0.12.1"
version = "0.13.0"
authors = ["Parity Technologies <[email protected]>"]
license = "Apache-2.0 OR MIT"
description = "Multiplexer over reliable, ordered connections"
Expand All @@ -19,5 +19,5 @@ static_assertions = "1"
pin-project = "1.1.0"

[dev-dependencies]
quickcheck = "1.0"
futures = { version = "0.3.12", default-features = false, features = ["executor"] }
quickcheck = "1.0"
67 changes: 13 additions & 54 deletions yamux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
error::ConnectionError,
frame::header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID},
frame::{self, Frame},
Config, WindowUpdateMode, DEFAULT_CREDIT,
Config, DEFAULT_CREDIT,
};
use crate::{Result, MAX_ACK_BACKLOG};
use cleanup::Cleanup;
Expand Down Expand Up @@ -304,9 +304,7 @@ enum Action {
/// Nothing to be done.
None,
/// A new stream has been opened by the remote.
New(Stream, Option<Frame<WindowUpdate>>),
/// A window update should be sent to the remote.
Update(Frame<WindowUpdate>),
New(Stream),
/// A ping should be answered.
Ping(Frame<Ping>),
/// A stream should be reset.
Expand Down Expand Up @@ -504,23 +502,13 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
// The remote may be out of credit though and blocked on
// writing more data. We may need to reset the stream.
State::SendClosed => {
if self.config.window_update_mode == WindowUpdateMode::OnRead
&& shared.window == 0
{
// The remote may be waiting for a window update
// which we will never send, so reset the stream now.
let mut header = Header::data(stream_id, 0);
header.rst();
Some(Frame::new(header))
} else {
// The remote has either still credit or will be given more
// (due to an enqueued window update or because the update
// mode is `OnReceive`) or we already have inbound frames in
// the socket buffer which will be processed later. In any
// case we will reply with an RST in `Connection::on_data`
// because the stream will no longer be known.
None
}
// The remote has either still credit or will be given more
// due to an enqueued window update or we already have
// inbound frames in the socket buffer which will be
// processed later. In any case we will reply with an RST in
// `Connection::on_data` because the stream will no longer
// be known.
None
}
// The stream was properly closed. We already have sent our FIN frame. The
// remote end has already done so in the past.
Expand Down Expand Up @@ -569,18 +557,10 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
};
match action {
Action::None => {}
Action::New(stream, update) => {
Action::New(stream) => {
log::trace!("{}: new inbound {} of {}", self.id, stream, self);
if let Some(f) = update {
log::trace!("{}/{}: sending update", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
return Ok(Some(stream));
}
Action::Update(f) => {
log::trace!("{}: sending update: {:?}", self.id, f.header());
self.pending_frames.push_back(f.into());
}
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
Expand Down Expand Up @@ -641,30 +621,17 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
return Action::Terminate(Frame::internal_error());
}
let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
let mut window_update = None;
{
let mut shared = stream.shared();
if is_finish {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
shared.window = shared.window.saturating_sub(frame.body_len());
shared.buffer.push(frame.into_body());

#[allow(deprecated)]
if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
if let Some(credit) = shared.next_window_update() {
shared.window += credit;
let mut frame = Frame::window_update(stream_id, credit);
frame.header_mut().ack();
window_update = Some(frame)
}
}
}
if window_update.is_none() {
stream.set_flag(stream::Flag::Ack)
}
stream.set_flag(stream::Flag::Ack);
self.streams.insert(stream_id, stream.clone_shared());
return Action::New(stream, window_update);
return Action::New(stream);
}

if let Some(s) = self.streams.get_mut(&stream_id) {
Expand Down Expand Up @@ -696,14 +663,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
if let Some(w) = shared.reader.take() {
w.wake()
}
#[allow(deprecated)]
if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
if let Some(credit) = shared.next_window_update() {
shared.window += credit;
let frame = Frame::window_update(stream_id, credit);
return Action::Update(frame);
}
}
} else {
log::trace!(
"{}/{}: data frame for unknown stream, possibly dropped earlier: {:?}",
Expand Down Expand Up @@ -768,7 +727,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
.update_state(self.id, stream_id, State::RecvClosed);
}
self.streams.insert(stream_id, stream.clone_shared());
return Action::New(stream, None);
return Action::New(stream);
}

if let Some(s) = self.streams.get_mut(&stream_id) {
Expand Down
28 changes: 7 additions & 21 deletions yamux/src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
header::{Data, Header, StreamId, WindowUpdate},
Frame,
},
Config, WindowUpdateMode, DEFAULT_CREDIT,
Config, DEFAULT_CREDIT,
};
use futures::{
channel::mpsc,
Expand Down Expand Up @@ -200,13 +200,6 @@ impl Stream {
/// Send new credit to the sending side via a window update message if
/// permitted.
fn send_window_update(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
// When using [`WindowUpdateMode::OnReceive`] window update messages are
// send early on data receival (see [`crate::Connection::on_frame`]).
#[allow(deprecated)]
if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
return Poll::Ready(Ok(()));
}

let mut shared = self.shared.lock();

if let Some(credit) = shared.next_window_update() {
Expand Down Expand Up @@ -488,6 +481,7 @@ impl Shared {
current // Return the previous stream state for informational purposes.
}

// TODO: This does not need to live in shared any longer.
/// Calculate the number of additional window bytes the receiving side
/// should grant the sending side via a window update message.
///
Expand All @@ -500,20 +494,12 @@ impl Shared {
return None;
}

let new_credit = match self.config.window_update_mode {
#[allow(deprecated)]
WindowUpdateMode::OnReceive => {
debug_assert!(self.config.receive_window >= self.window);
let new_credit = {
debug_assert!(self.config.receive_window >= self.window);
let bytes_received = self.config.receive_window.saturating_sub(self.window);
let buffer_len: u32 = self.buffer.len().try_into().unwrap_or(std::u32::MAX);

self.config.receive_window.saturating_sub(self.window)
}
WindowUpdateMode::OnRead => {
debug_assert!(self.config.receive_window >= self.window);
let bytes_received = self.config.receive_window.saturating_sub(self.window);
let buffer_len: u32 = self.buffer.len().try_into().unwrap_or(std::u32::MAX);

bytes_received.saturating_sub(buffer_len)
}
bytes_received.saturating_sub(buffer_len)
};

// Send WindowUpdate message when half or more of the configured receive
Expand Down
28 changes: 0 additions & 28 deletions yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,6 @@ const MAX_ACK_BACKLOG: usize = 256;
/// https://github.com/paritytech/yamux/issues/100.
const DEFAULT_SPLIT_SEND_SIZE: usize = 16 * 1024;

/// Specifies when window update frames are sent.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WindowUpdateMode {
/// Send window updates as soon as a [`Stream`]'s receive window drops to 0.
///
/// This ensures that the sender can resume sending more data as soon as possible
/// but a slow reader on the receiving side may be overwhelmed, i.e. it accumulates
/// data in its buffer which may reach its limit (see `set_max_buffer_size`).
/// In this mode, window updates merely prevent head of line blocking but do not
/// effectively exercise back pressure on senders.
#[deprecated(note = "Use `WindowUpdateMode::OnRead` instead.")]
OnReceive,

/// Send window updates only when data is read on the receiving end.
///
/// This ensures that senders do not overwhelm receivers and keeps buffer usage
/// low.
OnRead,
}

/// Yamux configuration.
///
/// The default configuration values are as follows:
Expand All @@ -98,7 +78,6 @@ pub struct Config {
receive_window: u32,
max_buffer_size: usize,
max_num_streams: usize,
window_update_mode: WindowUpdateMode,
read_after_close: bool,
split_send_size: usize,
}
Expand All @@ -109,7 +88,6 @@ impl Default for Config {
receive_window: DEFAULT_CREDIT,
max_buffer_size: 1024 * 1024,
max_num_streams: 8192,
window_update_mode: WindowUpdateMode::OnRead,
read_after_close: true,
split_send_size: DEFAULT_SPLIT_SEND_SIZE,
}
Expand Down Expand Up @@ -140,12 +118,6 @@ impl Config {
self
}

/// Set the window update mode to use.
pub fn set_window_update_mode(&mut self, m: WindowUpdateMode) -> &mut Self {
self.window_update_mode = m;
self
}

/// Allow or disallow streams to read from buffered data after
/// the connection has been closed.
pub fn set_read_after_close(&mut self, b: bool) -> &mut Self {
Expand Down

0 comments on commit 16ffe54

Please sign in to comment.