Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(webocket): Avoid panic when polling quicksink after errors #5482

Merged
merged 9 commits into from
Jul 9, 2024
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ libp2p-upnp = { version = "0.2.2", path = "protocols/upnp" }
libp2p-webrtc = { version = "0.7.1-alpha", path = "transports/webrtc" }
libp2p-webrtc-utils = { version = "0.2.1", path = "misc/webrtc-utils" }
libp2p-webrtc-websys = { version = "0.3.0-alpha", path = "transports/webrtc-websys" }
libp2p-websocket = { version = "0.43.1", path = "transports/websocket" }
libp2p-websocket = { version = "0.43.2", path = "transports/websocket" }
libp2p-websocket-websys = { version = "0.3.2", path = "transports/websocket-websys" }
libp2p-webtransport-websys = { version = "0.3.0", path = "transports/webtransport-websys" }
libp2p-yamux = { version = "0.45.2", path = "muxers/yamux" }
Expand Down
6 changes: 6 additions & 0 deletions transports/websocket/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.43.2

- fix: Avoid websocket panic on polling after errors. See [PR 5482].

[PR 5482]: https://github.com/libp2p/rust-libp2p/pull/5482

## 0.43.1

## 0.43.0
Expand Down
2 changes: 1 addition & 1 deletion transports/websocket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-websocket"
edition = "2021"
rust-version = { workspace = true }
description = "WebSocket transport for libp2p"
version = "0.43.1"
version = "0.43.2"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
83 changes: 69 additions & 14 deletions transports/websocket/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,10 +571,20 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {
/// The websocket connection.
pub struct Connection<T> {
receiver: BoxStream<'static, Result<Incoming, connection::Error>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
sender: ConnectionSender,
_marker: std::marker::PhantomData<T>,
}

/// Keep track of the connection state from the sender's perspective.
enum ConnectionSender {
/// The sender is alive and can be polled.
Alive {
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
},
/// The sender has returned an error and must not be polled again.
Dead,
}

/// Data or control information received over the websocket connection.
#[derive(Debug, Clone)]
pub enum Incoming {
Expand Down Expand Up @@ -703,7 +713,9 @@ where
});
Connection {
receiver: stream.boxed(),
sender: Box::pin(sink),
sender: ConnectionSender::Alive {
sender: Box::pin(sink),
},
_marker: std::marker::PhantomData,
}
}
Expand Down Expand Up @@ -744,26 +756,69 @@ where
type Error = io::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.sender)
.poll_ready(cx)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
match &mut self.sender {
ConnectionSender::Alive { sender } => match Pin::new(sender).poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => {
self.sender = ConnectionSender::Dead;
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e)))
}
Poll::Pending => Poll::Pending,
},
ConnectionSender::Dead => Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Connection sender is dead, poll_ready called after an error",
))),
}
}

fn start_send(mut self: Pin<&mut Self>, item: OutgoingData) -> io::Result<()> {
Pin::new(&mut self.sender)
.start_send(item)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
match &mut self.sender {
ConnectionSender::Alive { sender } => match Pin::new(sender).start_send(item) {
Ok(()) => Ok(()),
Err(e) => {
self.sender = ConnectionSender::Dead;
Err(io::Error::new(io::ErrorKind::Other, e))
}
},
ConnectionSender::Dead => Err(io::Error::new(
io::ErrorKind::Other,
"Connection sender is dead, start_send called after an error",
)),
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.sender)
.poll_flush(cx)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
match &mut self.sender {
ConnectionSender::Alive { sender } => match Pin::new(sender).poll_flush(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => {
self.sender = ConnectionSender::Dead;
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e)))
}
Poll::Pending => Poll::Pending,
},
ConnectionSender::Dead => Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Connection sender is dead, poll_flush called after an error",
))),
}
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.sender)
.poll_close(cx)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
match &mut self.sender {
ConnectionSender::Alive { sender } => match Pin::new(sender).poll_close(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => {
self.sender = ConnectionSender::Dead;
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e)))
}
Poll::Pending => Poll::Pending,
},
ConnectionSender::Dead => Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Connection sender is dead, poll_close called after an error",
))),
}
}
}
Loading