From fe09c45a546177dacb20b6fbc62bc1f89dffb6d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Mon, 13 Jan 2025 14:47:42 +0100 Subject: [PATCH] Bump tokio tungstenite to 0.26 (#428) --- Cargo.toml | 2 +- crates/engineioxide/src/packet.rs | 23 ++++++++++++++----- .../src/transport/polling/payload/encoder.rs | 8 +++---- crates/engineioxide/src/transport/ws.rs | 23 +++++++++++-------- .../socketioxide/tests/disconnect_reason.rs | 4 +--- crates/socketioxide/tests/fixture.rs | 2 +- 6 files changed, 37 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 46c5069e..5af0aaa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ bytes = { version = "1.7", features = ["serde"] } futures-core = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std"] } tokio = "1.40" -tokio-tungstenite = "0.24" +tokio-tungstenite = "0.26" serde = { version = "1.0", features = ["derive"] } smallvec = { version = "1.13", features = ["union"] } serde_json = "1.0" diff --git a/crates/engineioxide/src/packet.rs b/crates/engineioxide/src/packet.rs index 8733ea74..bb82038f 100644 --- a/crates/engineioxide/src/packet.rs +++ b/crates/engineioxide/src/packet.rs @@ -110,12 +110,11 @@ impl Packet { } /// Serialize a [Packet] to a [String] according to the Engine.IO protocol -impl TryInto for Packet { - type Error = Error; - fn try_into(self) -> Result { - let len = self.get_size_hint(true); +impl From for String { + fn from(packet: Packet) -> String { + let len = packet.get_size_hint(true); let mut buffer = String::with_capacity(len); - match self { + match packet { Packet::Open(open) => { buffer.push('0'); buffer.push_str(&serde_json::to_string(&open).unwrap()); @@ -140,7 +139,12 @@ impl TryInto for Packet { general_purpose::STANDARD.encode_string(data, &mut buffer); } }; - Ok(buffer) + buffer + } +} +impl From for tokio_tungstenite::tungstenite::Utf8Bytes { + fn from(value: Packet) -> Self { + String::from(value).into() } } /// Deserialize a [Packet] from a [String] according to the Engine.IO protocol @@ -176,6 +180,13 @@ impl TryFrom for Packet { Ok(res) } } +impl TryFrom for Packet { + type Error = Error; + fn try_from(value: tokio_tungstenite::tungstenite::Utf8Bytes) -> Result { + // SAFETY: The utf8 bytes are guaranteed to be valid utf8 + Packet::try_from(unsafe { Str::from_bytes_unchecked(value.into()) }) + } +} impl TryFrom for Packet { type Error = Error; diff --git a/crates/engineioxide/src/transport/polling/payload/encoder.rs b/crates/engineioxide/src/transport/polling/payload/encoder.rs index 214d3074..d3a31524 100644 --- a/crates/engineioxide/src/transport/polling/payload/encoder.rs +++ b/crates/engineioxide/src/transport/polling/payload/encoder.rs @@ -88,7 +88,7 @@ pub async fn v4_encoder( try_recv_packet(&mut rx, data.len() + PUNCTUATION_LEN, max_payload, true) { for packet in packets { - let packet: String = packet.try_into()?; + let packet: String = packet.into(); if !data.is_empty() { data.push(std::char::from_u32(PACKET_SEPARATOR_V4 as u32).unwrap()); @@ -101,7 +101,7 @@ pub async fn v4_encoder( if data.is_empty() { let packets = recv_packet(&mut rx).await?; for packet in packets { - let packet: String = packet.try_into()?; + let packet: String = packet.into(); data.push_str(&packet); } } @@ -133,7 +133,7 @@ pub fn v3_bin_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Resu data.extend_from_slice(&bin); // raw data } packet => { - let packet: String = packet.try_into()?; + let packet: String = packet.into(); let len = itoa.format(packet.len()); let len_len = len.len(); // len is guaranteed to be ascii @@ -156,7 +156,7 @@ pub fn v3_bin_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Resu pub fn v3_string_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Result<(), Error> { use crate::transport::polling::payload::STRING_PACKET_SEPARATOR_V3; use bytes::BufMut; - let packet: String = packet.try_into()?; + let packet: String = packet.into(); let packet = format!( "{}{}{}", packet.chars().count(), diff --git a/crates/engineioxide/src/transport/ws.rs b/crates/engineioxide/src/transport/ws.rs index dbfbf202..f5a13978 100644 --- a/crates/engineioxide/src/transport/ws.rs +++ b/crates/engineioxide/src/transport/ws.rs @@ -185,9 +185,9 @@ where Message::Binary(mut data) => { if socket.protocol == ProtocolVersion::V3 && !data.is_empty() { // The first byte is the message type, which we don't need. - let _ = data.remove(0); + data = data.split_off(1); } - engine.handler.on_binary(data.into(), socket.clone()); + engine.handler.on_binary(data, socket.clone()); Ok(()) } Message::Close(_) => break, @@ -221,12 +221,16 @@ where ($item:ident) => { let res = match $item { Packet::Binary(bin) | Packet::BinaryV3(bin) => { - let mut bin: Vec = bin.into(); if socket.protocol == ProtocolVersion::V3 { - // v3 protocol requires packet type as the first byte - bin.insert(0, 0x04); + // v3 protocol requires packet type as the first byte. + // This requires a new buffer. This is OK as it is only for the V3 protocol. + let mut buff = Vec::with_capacity(bin.len() + 1); + buff.push(0x04); + buff.extend(bin); + tx.feed(Message::Binary(buff.into())).await + } else { + tx.feed(Message::Binary(bin)).await } - tx.feed(Message::Binary(bin)).await } Packet::Close => { tx.send(Message::Close(None)).await.ok(); @@ -239,7 +243,7 @@ where Packet::Noop => Ok(()), _ => { let packet: String = $item.try_into().unwrap(); - tx.feed(Message::Text(packet)).await + tx.feed(Message::Text(packet.into())).await } }; if let Err(_e) = res { @@ -274,7 +278,7 @@ where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { let packet = Packet::Open(OpenPacket::new(TransportType::Websocket, sid, config)); - ws.send(Message::Text(packet.try_into()?)).await?; + ws.send(Message::Text(packet.into())).await?; Ok(()) } @@ -320,8 +324,7 @@ where match Packet::try_from(msg)? { Packet::PingUpgrade => { // Respond with a PongUpgrade packet - ws.send(Message::Text(Packet::PongUpgrade.try_into()?)) - .await?; + ws.send(Message::Text(Packet::PongUpgrade.into())).await?; } p => Err(Error::BadPacket(p))?, }; diff --git a/crates/socketioxide/tests/disconnect_reason.rs b/crates/socketioxide/tests/disconnect_reason.rs index 25f81eb2..7f893321 100644 --- a/crates/socketioxide/tests/disconnect_reason.rs +++ b/crates/socketioxide/tests/disconnect_reason.rs @@ -237,9 +237,7 @@ pub async fn server_ns_close() { }); let mut ws = create_ws_connection(12353).await; - ws.send(Message::Text("40/test,{}".to_string())) - .await - .unwrap(); + ws.send(Message::Text("40/test,{}".into())).await.unwrap(); let data = tokio::time::timeout(Duration::from_millis(20), rx.recv()) .await .expect("timeout waiting for DisconnectReason::ServerNSDisconnect") diff --git a/crates/socketioxide/tests/fixture.rs b/crates/socketioxide/tests/fixture.rs index 0bf0e3ca..5a6fea5c 100644 --- a/crates/socketioxide/tests/fixture.rs +++ b/crates/socketioxide/tests/fixture.rs @@ -101,7 +101,7 @@ pub async fn create_ws_connection(port: u16) -> WebSocketStream