Skip to content

Commit

Permalink
src/connection: Send WindowUpdate message early
Browse files Browse the repository at this point in the history
To prevent senders from being blocked every time they have sent
RECEIVE_WINDOW (e.g. 256 KB) number of bytes, waiting for a WindowUpdate
message granting further sending credit, this commit makes Yamux send a
WindowUpdate message once half or more of the window has been received
(and consumed in `WindowUpdateMode::OnRead`). Benchmarking shows that
sending WindowUpdate messages early prevents senders from being blocked
waiting for additional credit on common network types.

For a detailed discussion as well as various benchmark results see
libp2p#100.

Next to the above, this commit includes the following changes:

- Use `WindowUpdateMode::OnRead` in benchmarks. I would argue that
`OnRead` should be the default setting, given the importance of
flow-control. With that in mind, I suggest to benchmark that default
case.

- Ignore WindowUpdate messages for unknown streams. With this commit
WindowUpdate messages are sent more agressively. The following scenario
would surface: A sender would close its channel, eventually being
garbage collected. The receiver, before receiving the closing message,
sends out a WindowUpdate message. The sender receives the WindowUpdate
message not being able to associate it to a stream, thus resetting the
whole connection. This commit ignores WindowUpdate messages for unknown
streams instead.

When sending very large messages, WindowUpdate messages are still not
returned to the sender in time, thus the sender still being blocked in
such case. This can be prevented by splitting large messages into
smaller Yamux frames, see
libp2p#100 (comment).
This additional optimization can be done in a future commit without
interfering with the optimization introduced in this commit.
  • Loading branch information
mxinden committed Feb 8, 2021
1 parent 389415a commit e56e0f6
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 115 deletions.
13 changes: 9 additions & 4 deletions benches/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ fn concurrent(c: &mut Criterion) {
group.finish();
}

fn config() -> Config {
let mut c = Config::default();
c.set_window_update_mode(yamux::WindowUpdateMode::OnRead);
c
}

async fn oneway(
nstreams: usize,
nmessages: usize,
Expand All @@ -73,7 +79,7 @@ async fn oneway(
let (tx, rx) = mpsc::unbounded();

let server = async move {
let mut connection = Connection::new(server, Config::default(), Mode::Server);
let mut connection = Connection::new(server, config(), Mode::Server);

while let Some(mut stream) = connection.next_stream().await.unwrap() {
let tx = tx.clone();
Expand All @@ -85,7 +91,7 @@ async fn oneway(
// Receive `nmessages` messages.
for _ in 0 .. nmessages {
stream.read_exact(&mut b[..]).await.unwrap();
n += b.len()
n += b.len();
}

tx.unbounded_send(n).expect("unbounded_send");
Expand All @@ -95,7 +101,7 @@ async fn oneway(
};
task::spawn(server);

let conn = Connection::new(client, Config::default(), Mode::Client);
let conn = Connection::new(client, config(), Mode::Client);
let mut ctrl = conn.control();
task::spawn(yamux::into_stream(conn).for_each(|r| {r.unwrap(); future::ready(())} ));

Expand All @@ -111,7 +117,6 @@ async fn oneway(
}

stream.close().await.unwrap();
Ok::<(), yamux::ConnectionError>(())
});
}

Expand Down
29 changes: 9 additions & 20 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,24 +658,20 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
let sender = self.stream_sender.clone();
Stream::new(stream_id, self.id, config, credit, credit, sender)
};
let window_update;
let mut window_update = None;
{
let mut shared = stream.shared();
if is_finish {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
shared.window = shared.window.saturating_sub(frame.body_len());
shared.buffer.push(frame.into_body());
if !is_finish
&& shared.window == 0
&& self.config.window_update_mode == WindowUpdateMode::OnReceive
{
shared.window = self.config.receive_window;
let mut frame = Frame::window_update(stream_id, self.config.receive_window);

if let Some(credit) = shared.next_window_update() {
shared.window += credit;
let mut frame = Frame::window_update(stream_id, credit);
frame.header_mut().ack();
window_update = Some(frame)
} else {
window_update = None
}
}
if window_update.is_none() {
Expand Down Expand Up @@ -706,12 +702,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
if let Some(w) = shared.reader.take() {
w.wake()
}
if !is_finish
&& shared.window == 0
&& self.config.window_update_mode == WindowUpdateMode::OnReceive
{
shared.window = self.config.receive_window;
let frame = Frame::window_update(stream_id, self.config.receive_window);
if let Some(credit) = shared.next_window_update() {
shared.window += credit;
let frame = Frame::window_update(stream_id, credit);
return Action::Update(frame)
}
} else if !is_finish {
Expand Down Expand Up @@ -781,10 +774,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
w.wake()
}
} else if !is_finish {
log::debug!("{}/{}: window update for unknown stream", self.id, stream_id);
let mut header = Header::data(stream_id, 0);
header.rst();
return Action::Reset(Frame::new(header))
log::debug!("{}/{}: window update for unknown stream, ignoring", self.id, stream_id);
}

Action::None
Expand Down Expand Up @@ -939,4 +929,3 @@ where
}
})
}

Loading

0 comments on commit e56e0f6

Please sign in to comment.