Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src/connection: Process command or socket result immediately #138

Merged
merged 2 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# 0.10.2

- Process command or socket result immediately and thereby no longer accessing
the socket after it returned an error. See [PR 138] for details.

[PR 138]: https://github.com/libp2p/rust-yamux/pull/138

# 0.10.1

- Update `parking_lot` dependency. See [PR 126].
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yamux"
version = "0.10.1"
version = "0.10.2"
authors = ["Parity Technologies <[email protected]>"]
license = "Apache-2.0 OR MIT"
description = "Multiplexer over reliable, ordered connections"
Expand Down
48 changes: 17 additions & 31 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,7 @@ use futures::{
stream::{Fuse, FusedStream},
};
use nohash_hasher::IntMap;
use std::{
fmt, io,
sync::Arc,
task::{Context, Poll},
};
use std::{fmt, io, sync::Arc, task::Poll};

pub use control::Control;
pub use stream::{Packet, State, Stream};
Expand Down Expand Up @@ -427,7 +423,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {

let mut num_terminated = 0;

let mut next_frame = if self.socket.is_terminated() {
let next_frame = if self.socket.is_terminated() {
num_terminated += 1;
Either::Left(future::pending())
} else {
Expand Down Expand Up @@ -458,14 +454,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
Either::Right(next_frame)
};

let mut next_stream_command = if self.stream_receiver.is_terminated() {
let next_stream_command = if self.stream_receiver.is_terminated() {
num_terminated += 1;
Either::Left(future::pending())
} else {
Either::Right(self.stream_receiver.next())
};

let mut next_control_command = if self.control_receiver.is_terminated() {
let next_control_command = if self.control_receiver.is_terminated() {
num_terminated += 1;
Either::Left(future::pending())
} else {
Expand All @@ -477,29 +473,19 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
return Err(ConnectionError::Closed);
}

let next_item = future::poll_fn(move |cx: &mut Context| {
let a = next_stream_command.poll_unpin(cx);
let b = next_control_command.poll_unpin(cx);
let c = next_frame.poll_unpin(cx);
if a.is_pending() && b.is_pending() && c.is_pending() {
return Poll::Pending;
}
Poll::Ready((a, b, c))
});

let (stream_command, control_command, frame) = next_item.await;

if let Poll::Ready(cmd) = control_command {
self.on_control_command(cmd).await?
}

if let Poll::Ready(cmd) = stream_command {
self.on_stream_command(cmd).await?
}

if let Poll::Ready(frame) = frame {
if let Some(stream) = self.on_frame(frame.transpose().map_err(Into::into)).await? {
return Ok(Some(stream));
let combined_future = future::select(
future::select(next_stream_command, next_control_command),
next_frame,
);
match combined_future.await {
Either::Left((Either::Left((cmd, _)), _)) => self.on_stream_command(cmd).await?,
Either::Left((Either::Right((cmd, _)), _)) => self.on_control_command(cmd).await?,
Either::Right((frame, _)) => {
if let Some(stream) =
self.on_frame(frame.transpose().map_err(Into::into)).await?
{
return Ok(Some(stream));
}
}
Comment on lines +476 to 489
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using the futures::select! macro instead of nesting future::select calls?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have, though I favor futures::future::select over futures::select! due to the following reasons:

  • While more verbose, I find futures::future::select easier to reason about than futures::select!.

  • In general I prefer functions over macros.

  • I understand the inner workings of futures::future::select. The implementation is very simple.

  • I don't fully understand the guarantees of futures::select!, e.g.:

    If multiple futures are ready, one will be pseudo-randomly selected at runtime.

    https://docs.rs/futures/latest/futures/macro.select.html

    What happens to the result of the non-selected ready futures?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM!

}
}
Expand Down