diff --git a/benches/concurrent.rs b/benches/concurrent.rs index da3d95db..ad68f8ec 100644 --- a/benches/concurrent.rs +++ b/benches/concurrent.rs @@ -62,6 +62,12 @@ fn concurrent(c: &mut Criterion) { group.finish(); } +fn config() -> Config { + let mut c = Config::default(); + c.set_window_update_mode(yamux::WindowUpdateMode::OnRead); + c +} + async fn oneway( nstreams: usize, nmessages: usize, @@ -73,7 +79,7 @@ async fn oneway( let (tx, rx) = mpsc::unbounded(); let server = async move { - let mut connection = Connection::new(server, Config::default(), Mode::Server); + let mut connection = Connection::new(server, config(), Mode::Server); while let Some(mut stream) = connection.next_stream().await.unwrap() { let tx = tx.clone(); @@ -85,7 +91,7 @@ async fn oneway( // Receive `nmessages` messages. for _ in 0 .. nmessages { stream.read_exact(&mut b[..]).await.unwrap(); - n += b.len() + n += b.len(); } tx.unbounded_send(n).expect("unbounded_send"); @@ -95,7 +101,7 @@ async fn oneway( }; task::spawn(server); - let conn = Connection::new(client, Config::default(), Mode::Client); + let conn = Connection::new(client, config(), Mode::Client); let mut ctrl = conn.control(); task::spawn(yamux::into_stream(conn).for_each(|r| {r.unwrap(); future::ready(())} )); @@ -111,7 +117,6 @@ async fn oneway( } stream.close().await.unwrap(); - Ok::<(), yamux::ConnectionError>(()) }); } diff --git a/src/connection.rs b/src/connection.rs index 0458a15e..8a7758ef 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -658,7 +658,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> { let sender = self.stream_sender.clone(); Stream::new(stream_id, self.id, config, credit, credit, sender) }; - let window_update; + let mut window_update = None; { let mut shared = stream.shared(); if is_finish { @@ -666,16 +666,12 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> { } shared.window = shared.window.saturating_sub(frame.body_len()); shared.buffer.push(frame.into_body()); - if !is_finish - && shared.window == 0 - && self.config.window_update_mode == WindowUpdateMode::OnReceive - { - shared.window = self.config.receive_window; - let mut frame = Frame::window_update(stream_id, self.config.receive_window); + + if let Some(credit) = shared.next_window_update() { + shared.window += credit; + let mut frame = Frame::window_update(stream_id, credit); frame.header_mut().ack(); window_update = Some(frame) - } else { - window_update = None } } if window_update.is_none() { @@ -706,12 +702,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> { if let Some(w) = shared.reader.take() { w.wake() } - if !is_finish - && shared.window == 0 - && self.config.window_update_mode == WindowUpdateMode::OnReceive - { - shared.window = self.config.receive_window; - let frame = Frame::window_update(stream_id, self.config.receive_window); + if let Some(credit) = shared.next_window_update() { + shared.window += credit; + let frame = Frame::window_update(stream_id, credit); return Action::Update(frame) } } else if !is_finish { @@ -781,10 +774,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> { w.wake() } } else if !is_finish { - log::debug!("{}/{}: window update for unknown stream", self.id, stream_id); - let mut header = Header::data(stream_id, 0); - header.rst(); - return Action::Reset(Frame::new(header)) + log::debug!("{}/{}: window update for unknown stream, ignoring", self.id, stream_id); } Action::None @@ -939,4 +929,3 @@ where } }) } - diff --git a/src/connection/stream.rs b/src/connection/stream.rs index e9290b8b..56310dcd 100644 --- a/src/connection/stream.rs +++ b/src/connection/stream.rs @@ -21,6 +21,7 @@ use crate::{ use futures::{future::Either, ready, channel::mpsc, io::{AsyncRead, AsyncWrite}}; use parking_lot::{Mutex, MutexGuard}; use std::{fmt, io, pin::Pin, sync::Arc, task::{Context, Poll, Waker}}; +use std::convert::TryInto; /// The state of a Yamux stream. #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -78,7 +79,6 @@ pub struct Stream { conn: connection::Id, config: Arc<Config>, sender: mpsc::Sender<StreamCommand>, - window_update: Option<Frame<WindowUpdate>>, flag: Flag, shared: Arc<Mutex<Shared>> } @@ -88,7 +88,6 @@ impl fmt::Debug for Stream { f.debug_struct("Stream") .field("id", &self.id.val()) .field("connection", &self.conn) - .field("window_update", &self.window_update.is_some()) .finish() } } @@ -112,11 +111,10 @@ impl Stream { Stream { id, conn, - config, + config: config.clone(), sender, - window_update: None, flag: Flag::None, - shared: Arc::new(Mutex::new(Shared::new(window, credit))), + shared: Arc::new(Mutex::new(Shared::new(window, credit, config))), } } @@ -149,7 +147,6 @@ impl Stream { conn: self.conn, config: self.config.clone(), sender: self.sender.clone(), - window_update: None, flag: self.flag, shared: self.shared.clone() } @@ -175,16 +172,28 @@ impl Stream { } } - /// Try to deliver a pending window update. - fn send_pending_window_update(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { - if self.window_update.is_some() { + /// Send new credit to the sending side via a window update message if + /// permitted. + fn send_window_update(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { + // When using [`WindowUpdateMode::OnReceive`] window update messages are + // send early on data receival (see [`crate::Connection::on_frame`]). + if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) { + return Poll::Ready(Ok(())); + } + + let credit = self.shared().next_window_update(); + + if let Some(credit) = credit { ready!(self.sender.poll_ready(cx).map_err(|_| self.write_zero_err())?); - let mut frame = self.window_update.take().expect("window_update.is_some()").right(); + + let mut frame = Frame::window_update(self.id, credit).right(); self.add_flag(frame.header_mut()); let cmd = StreamCommand::SendFrame(frame); + self.sender.start_send(cmd).map_err(|_| self.write_zero_err())?; - self.shared().window = self.config.receive_window + self.shared().window += credit; } + Poll::Ready(Ok(())) } } @@ -207,48 +216,33 @@ impl futures::stream::Stream for Stream { return Poll::Ready(None) } - ready!(self.send_pending_window_update(cx))?; + ready!(self.send_window_update(cx))?; - // We need to limit the `shared` `MutexGuard` scope, or else we run into - // borrow check troubles further down. - { - let mut shared = self.shared(); + let mut shared = self.shared(); - if let Some(bytes) = shared.buffer.pop() { - let off = bytes.offset(); - let mut vec = bytes.into_vec(); - if off != 0 { - // This should generally not happen when the stream is used only as - // a `futures::stream::Stream` since the whole point of this impl is - // to consume chunks atomically. It may perhaps happen when mixing - // this impl and the `AsyncRead` one. - log::debug!("{}/{}: chunk has been partially consumed", self.conn, self.id); - vec = vec.split_off(off) - } - return Poll::Ready(Some(Ok(Packet(vec)))) + if let Some(bytes) = shared.buffer.pop() { + let off = bytes.offset(); + let mut vec = bytes.into_vec(); + if off != 0 { + // This should generally not happen when the stream is used only as + // a `futures::stream::Stream` since the whole point of this impl is + // to consume chunks atomically. It may perhaps happen when mixing + // this impl and the `AsyncRead` one. + log::debug!("{}/{}: chunk has been partially consumed", self.conn, self.id); + vec = vec.split_off(off) } + return Poll::Ready(Some(Ok(Packet(vec)))) + } - // Buffer is empty, let's check if we can expect to read more data. - if !shared.state().can_read() { - log::debug!("{}/{}: eof", self.conn, self.id); - return Poll::Ready(None) // stream has been reset - } - - // Since we have no more data at this point, we want to be woken up - // by the connection when more becomes available for us. - shared.reader = Some(cx.waker().clone()); - - // Finally, let's see if we need to send a window update to the remote. - if self.config.window_update_mode != WindowUpdateMode::OnRead || shared.window > 0 { - // No, time to go. - return Poll::Pending - } + // Buffer is empty, let's check if we can expect to read more data. + if !shared.state().can_read() { + log::debug!("{}/{}: eof", self.conn, self.id); + return Poll::Ready(None) // stream has been reset } - // At this point we know we have to send a window update to the remote. - debug_assert!(self.window_update.is_none()); - self.window_update = Some(Frame::window_update(self.id, self.config.receive_window)); - ready!(self.send_pending_window_update(cx))?; + // Since we have no more data at this point, we want to be woken up + // by the connection when more becomes available for us. + shared.reader = Some(cx.waker().clone()); Poll::Pending } @@ -262,54 +256,39 @@ impl AsyncRead for Stream { return Poll::Ready(Ok(0)) } - ready!(self.send_pending_window_update(cx))?; - - // We need to limit the `shared` `MutexGuard` scope, or else we run into - // borrow check troubles further down. - { - // Copy data from stream buffer. - let mut shared = self.shared(); - let mut n = 0; - while let Some(chunk) = shared.buffer.front_mut() { - if chunk.is_empty() { - shared.buffer.pop(); - continue - } - let k = std::cmp::min(chunk.len(), buf.len() - n); - (&mut buf[n .. n + k]).copy_from_slice(&chunk.as_ref()[.. k]); - n += k; - chunk.advance(k); - if n == buf.len() { - break - } - } + ready!(self.send_window_update(cx))?; - if n > 0 { - log::trace!("{}/{}: read {} bytes", self.conn, self.id, n); - return Poll::Ready(Ok(n)) + // Copy data from stream buffer. + let mut shared = self.shared(); + let mut n = 0; + while let Some(chunk) = shared.buffer.front_mut() { + if chunk.is_empty() { + shared.buffer.pop(); + continue } - - // Buffer is empty, let's check if we can expect to read more data. - if !shared.state().can_read() { - log::debug!("{}/{}: eof", self.conn, self.id); - return Poll::Ready(Ok(0)) // stream has been reset + let k = std::cmp::min(chunk.len(), buf.len() - n); + (&mut buf[n .. n + k]).copy_from_slice(&chunk.as_ref()[.. k]); + n += k; + chunk.advance(k); + if n == buf.len() { + break } + } - // Since we have no more data at this point, we want to be woken up - // by the connection when more becomes available for us. - shared.reader = Some(cx.waker().clone()); + if n > 0 { + log::trace!("{}/{}: read {} bytes", self.conn, self.id, n); + return Poll::Ready(Ok(n)) + } - // Finally, let's see if we need to send a window update to the remote. - if self.config.window_update_mode != WindowUpdateMode::OnRead || shared.window > 0 { - // No, time to go. - return Poll::Pending - } + // Buffer is empty, let's check if we can expect to read more data. + if !shared.state().can_read() { + log::debug!("{}/{}: eof", self.conn, self.id); + return Poll::Ready(Ok(0)) // stream has been reset } - // At this point we know we have to send a window update to the remote. - debug_assert!(self.window_update.is_none()); - self.window_update = Some(Frame::window_update(self.id, self.config.receive_window)); - ready!(self.send_pending_window_update(cx))?; + // Since we have no more data at this point, we want to be woken up + // by the connection when more becomes available for us. + shared.reader = Some(cx.waker().clone()); Poll::Pending } @@ -372,18 +351,20 @@ pub(crate) struct Shared { pub(crate) credit: u32, pub(crate) buffer: Chunks, pub(crate) reader: Option<Waker>, - pub(crate) writer: Option<Waker> + pub(crate) writer: Option<Waker>, + config: Arc<Config> } impl Shared { - fn new(window: u32, credit: u32) -> Self { + fn new(window: u32, credit: u32, config: Arc<Config>) -> Self { Shared { state: State::Open, window, credit, buffer: Chunks::new(), reader: None, - writer: None + writer: None, + config } } @@ -414,5 +395,44 @@ impl Shared { current // Return the previous stream state for informational purposes. } + + /// Calculate the number of additional window bytes the receiving side + /// should grant the sending side via a window update message. + /// + /// Returns `None` if too small to justify a window update message. + /// + /// Note: Once a caller successfully sent a window update message, the + /// locally tracked window size needs to be updated manually by the caller. + pub(crate) fn next_window_update(&mut self) -> Option<u32> { + if !self.state.can_read() { + return None; + } + + let new_credit = match self.config.window_update_mode { + WindowUpdateMode::OnReceive => { + let bytes_received = self.config.receive_window - self.window; + bytes_received + }, + WindowUpdateMode::OnRead => { + let buffer_len: u32 = self.buffer.len() + .and_then(|l| l.try_into().ok()) + .unwrap_or(std::u32::MAX); + let bytes_received = self.config.receive_window - self.window; + let bytes_read = bytes_received.saturating_sub(buffer_len); + bytes_read + } + }; + + // Send WindowUpdate message when half or more of the configured receive + // window can be granted as additional credit to the sender. + // + // See https://github.com/paritytech/yamux/issues/100 for a detailed + // discussion. + if new_credit >= self.config.receive_window / 2 { + Some(new_credit) + } else { + None + } + } }