Skip to content

Commit

Permalink
fix(webocket): Avoid panic when polling quicksink after errors
Browse files Browse the repository at this point in the history
Pull-Request: libp2p#5482.
  • Loading branch information
lexnv authored and jxs committed Jul 11, 2024
1 parent 65cdc07 commit b496a9a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 45 deletions.
24 changes: 6 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions transports/websocket/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.42.2

- fix: Avoid websocket panic on polling after errors. See [PR XXXX].

[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX

## 0.42.1

- Bump `futures-rustls` to `0.24.0`.
Expand Down
8 changes: 5 additions & 3 deletions transports/websocket/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
[package]

name = "libp2p-websocket"
edition = "2021"
rust-version = { workspace = true }
description = "WebSocket transport for libp2p"
version = "0.42.1"
version = "0.42.2"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -20,9 +21,10 @@ log = "0.4.20"
parking_lot = "0.12.0"
pin-project-lite = "0.2.13"
rw-stream-sink = { workspace = true }
soketto = "0.7.0"
url = "2.4"
soketto = "0.8.0"
url = "2.5"
webpki-roots = "0.25"
thiserror = "1.0.61"

[dev-dependencies]
libp2p-tcp = { workspace = true, features = ["async-io"] }
Expand Down
2 changes: 1 addition & 1 deletion transports/websocket/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {
/// The websocket connection.
pub struct Connection<T> {
receiver: BoxStream<'static, Result<Incoming, connection::Error>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = quicksink::Error<connection::Error>> + Send>>,
_marker: std::marker::PhantomData<T>,
}

Expand Down
72 changes: 49 additions & 23 deletions transports/websocket/src/quicksink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@
// Ok::<_, io::Error>(stdout)
// });
// ```
//
// # Panics
//
// - If any of the [`Sink`] methods produce an error, the sink transitions
// to a failure state and none of its methods must be called afterwards or
// else a panic will occur.
// - If [`Sink::poll_close`] has been called, no other sink method must be
// called afterwards or else a panic will be caused.

use futures::{ready, sink::Sink};
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -102,6 +94,15 @@ enum State {
Failed,
}

/// Errors the `Sink` may return.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error<E> {
#[error("Error while sending over the sink, {0}")]
Send(E),
#[error("The Sink has closed")]
Closed,
}

pin_project! {
/// `SinkImpl` implements the `Sink` trait.
#[derive(Debug)]
Expand All @@ -119,7 +120,7 @@ where
F: FnMut(S, Action<A>) -> T,
T: Future<Output = Result<S, E>>,
{
type Error = E;
type Error = Error<E>;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Expand All @@ -135,28 +136,27 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
Poll::Ready(Err(e))
Poll::Ready(Err(Error::Send(e)))
}
}
}
State::Closing => match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
Ok(_) => {
this.future.set(None);
*this.state = State::Closed;
panic!("SinkImpl::poll_ready called on a closing sink.")
Poll::Ready(Err(Error::Closed))
}
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
Poll::Ready(Err(e))
Poll::Ready(Err(Error::Send(e)))
}
},
State::Empty => {
assert!(this.param.is_some());
Poll::Ready(Ok(()))
}
State::Closed => panic!("SinkImpl::poll_ready called on a closed sink."),
State::Failed => panic!("SinkImpl::poll_ready called after error."),
State::Closed | State::Failed => Poll::Ready(Err(Error::Closed)),
}
}

Expand Down Expand Up @@ -193,7 +193,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Flushing => {
Expand All @@ -207,7 +207,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
}
}
Expand All @@ -221,11 +221,10 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Closed => return Poll::Ready(Ok(())),
State::Failed => panic!("SinkImpl::poll_flush called after error."),
State::Closed | State::Failed => return Poll::Ready(Err(Error::Closed)),
}
}
}
Expand Down Expand Up @@ -253,7 +252,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Flushing => {
Expand All @@ -266,7 +265,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
}
}
Expand All @@ -280,11 +279,11 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Closed => return Poll::Ready(Ok(())),
State::Failed => panic!("SinkImpl::poll_closed called after error."),
State::Failed => return Poll::Ready(Err(Error::Closed)),
}
}
}
Expand Down Expand Up @@ -347,4 +346,31 @@ mod tests {
assert_eq!(&expected[..], &actual[..])
});
}

#[test]
fn error_does_not_panic() {
task::block_on(async {
let sink = make_sink(io::stdout(), |mut _stdout, _action| async move {
Err(io::Error::new(io::ErrorKind::Other, "oh no"))
});

futures::pin_mut!(sink);

let result = sink.send("hello").await;
match result {
Err(crate::quicksink::Error::Send(e)) => {
assert_eq!(e.kind(), io::ErrorKind::Other);
assert_eq!(e.to_string(), "oh no")
}
_ => panic!("unexpected result: {:?}", result),
};

// Call send again, expect not to panic.
let result = sink.send("hello").await;
match result {
Err(crate::quicksink::Error::Closed) => {}
_ => panic!("unexpected result: {:?}", result),
};
})
}
}

0 comments on commit b496a9a

Please sign in to comment.