From 8df6c8f0347757436e37291d8b3b0c5f061726f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 12 Jan 2025 20:17:05 +0100 Subject: [PATCH] Revert "Bump tokio tungstenite to 0.26 (#427)" (#429) This reverts commit 3dcd4e1a3059c10f40d6c503abd99b5467e03032. --- Cargo.toml | 4 +- crates/engineioxide/src/transport/ws.rs | 71 ++++++++++++------------- 2 files changed, 36 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 84812d9c..46c5069e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,8 +23,8 @@ resolver = "2" bytes = { version = "1.7", features = ["serde"] } futures-core = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std"] } -tokio = "1.43" -tokio-tungstenite = "0.26" +tokio = "1.40" +tokio-tungstenite = "0.24" serde = { version = "1.0", features = ["derive"] } smallvec = { version = "1.13", features = ["union"] } serde_json = "1.0" diff --git a/crates/engineioxide/src/transport/ws.rs b/crates/engineioxide/src/transport/ws.rs index b1ed5d91..dbfbf202 100644 --- a/crates/engineioxide/src/transport/ws.rs +++ b/crates/engineioxide/src/transport/ws.rs @@ -16,7 +16,7 @@ use tokio::{ task::JoinHandle, }; use tokio_tungstenite::{ - tungstenite::{handshake::derive_accept_key, protocol::Role, Message, Utf8Bytes}, + tungstenite::{handshake::derive_accept_key, protocol::Role, Message}, WebSocketStream, }; @@ -27,9 +27,10 @@ use crate::{ errors::Error, handler::EngineIoHandler, packet::{OpenPacket, Packet}, - service::{ProtocolVersion, TransportType}, + service::ProtocolVersion, + service::TransportType, sid::Sid, - DisconnectReason, Socket, Str, + DisconnectReason, Socket, }; /// Create a response for websocket upgrade @@ -164,32 +165,29 @@ where { while let Some(msg) = rx.try_next().await? { match msg { - Message::Text(msg) => { - match Packet::try_from(unsafe { Str::from_bytes_unchecked(msg.into()) })? { - Packet::Close => { - #[cfg(feature = "tracing")] - tracing::debug!("[sid={}] closing session", socket.id); - engine.close_session(socket.id, DisconnectReason::TransportClose); - break; - } - Packet::Pong | Packet::Ping => socket - .heartbeat_tx - .try_send(()) - .map_err(|_| Error::HeartbeatTimeout), - Packet::Message(msg) => { - engine.handler.on_message(msg, socket.clone()); - Ok(()) - } - p => return Err(Error::BadPacket(p)), + Message::Text(msg) => match Packet::try_from(msg)? { + Packet::Close => { + #[cfg(feature = "tracing")] + tracing::debug!("[sid={}] closing session", socket.id); + engine.close_session(socket.id, DisconnectReason::TransportClose); + break; } - } + Packet::Pong | Packet::Ping => socket + .heartbeat_tx + .try_send(()) + .map_err(|_| Error::HeartbeatTimeout), + Packet::Message(msg) => { + engine.handler.on_message(msg, socket.clone()); + Ok(()) + } + p => return Err(Error::BadPacket(p)), + }, Message::Binary(mut data) => { - #[cfg(feature = "v3")] if socket.protocol == ProtocolVersion::V3 && !data.is_empty() { // The first byte is the message type, which we don't need. - data = data.slice(1..); + let _ = data.remove(0); } - engine.handler.on_binary(data, socket.clone()); + engine.handler.on_binary(data.into(), socket.clone()); Ok(()) } Message::Close(_) => break, @@ -222,12 +220,12 @@ where macro_rules! map_fn { ($item:ident) => { let res = match $item { - Packet::Binary(bin) => tx.feed(Message::Binary(bin)).await, - Packet::BinaryV3(bin) => { - // v3 protocol requires packet type as the first byte - let mut buf = Vec::with_capacity(bin.len() + 1); - buf.push(0x04); - buf.extend_from_slice(&bin); + Packet::Binary(bin) | Packet::BinaryV3(bin) => { + let mut bin: Vec = bin.into(); + if socket.protocol == ProtocolVersion::V3 { + // v3 protocol requires packet type as the first byte + bin.insert(0, 0x04); + } tx.feed(Message::Binary(bin)).await } Packet::Close => { @@ -241,7 +239,7 @@ where Packet::Noop => Ok(()), _ => { let packet: String = $item.try_into().unwrap(); - tx.feed(Message::Text(Utf8Bytes::from(packet))).await + tx.feed(Message::Text(packet)).await } }; if let Err(_e) = res { @@ -276,8 +274,7 @@ where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { let packet = Packet::Open(OpenPacket::new(TransportType::Websocket, sid, config)); - let value: String = packet.try_into()?; - ws.send(Message::Text(Utf8Bytes::from(value))).await?; + ws.send(Message::Text(packet.try_into()?)).await?; Ok(()) } @@ -320,11 +317,11 @@ where Some(Ok(Message::Text(d))) => d, _ => Err(Error::Upgrade)?, }; - match Packet::try_from(unsafe { Str::from_bytes_unchecked(msg.into()) })? { + match Packet::try_from(msg)? { Packet::PingUpgrade => { // Respond with a PongUpgrade packet - let msg: String = Packet::PongUpgrade.try_into()?; - ws.send(Message::Text(Utf8Bytes::from(msg))).await?; + ws.send(Message::Text(Packet::PongUpgrade.try_into()?)) + .await?; } p => Err(Error::BadPacket(p))?, }; @@ -346,7 +343,7 @@ where Err(Error::Upgrade)? } }; - match Packet::try_from(unsafe { Str::from_bytes_unchecked(msg.into()) })? { + match Packet::try_from(msg)? { Packet::Upgrade => { #[cfg(feature = "tracing")] tracing::debug!("ws upgraded successful")