Skip to content

Commit

Permalink
Revert "Bump tokio tungstenite to 0.26 (#427)" (#429)
Browse files Browse the repository at this point in the history
This reverts commit 3dcd4e1.
  • Loading branch information
Totodore authored Jan 12, 2025
1 parent 3dcd4e1 commit 8df6c8f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 39 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ resolver = "2"
bytes = { version = "1.7", features = ["serde"] }
futures-core = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"] }
tokio = "1.43"
tokio-tungstenite = "0.26"
tokio = "1.40"
tokio-tungstenite = "0.24"
serde = { version = "1.0", features = ["derive"] }
smallvec = { version = "1.13", features = ["union"] }
serde_json = "1.0"
Expand Down
71 changes: 34 additions & 37 deletions crates/engineioxide/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::{
task::JoinHandle,
};
use tokio_tungstenite::{
tungstenite::{handshake::derive_accept_key, protocol::Role, Message, Utf8Bytes},
tungstenite::{handshake::derive_accept_key, protocol::Role, Message},
WebSocketStream,
};

Expand All @@ -27,9 +27,10 @@ use crate::{
errors::Error,
handler::EngineIoHandler,
packet::{OpenPacket, Packet},
service::{ProtocolVersion, TransportType},
service::ProtocolVersion,
service::TransportType,
sid::Sid,
DisconnectReason, Socket, Str,
DisconnectReason, Socket,
};

/// Create a response for websocket upgrade
Expand Down Expand Up @@ -164,32 +165,29 @@ where
{
while let Some(msg) = rx.try_next().await? {
match msg {
Message::Text(msg) => {
match Packet::try_from(unsafe { Str::from_bytes_unchecked(msg.into()) })? {
Packet::Close => {
#[cfg(feature = "tracing")]
tracing::debug!("[sid={}] closing session", socket.id);
engine.close_session(socket.id, DisconnectReason::TransportClose);
break;
}
Packet::Pong | Packet::Ping => socket
.heartbeat_tx
.try_send(())
.map_err(|_| Error::HeartbeatTimeout),
Packet::Message(msg) => {
engine.handler.on_message(msg, socket.clone());
Ok(())
}
p => return Err(Error::BadPacket(p)),
Message::Text(msg) => match Packet::try_from(msg)? {
Packet::Close => {
#[cfg(feature = "tracing")]
tracing::debug!("[sid={}] closing session", socket.id);
engine.close_session(socket.id, DisconnectReason::TransportClose);
break;
}
}
Packet::Pong | Packet::Ping => socket
.heartbeat_tx
.try_send(())
.map_err(|_| Error::HeartbeatTimeout),
Packet::Message(msg) => {
engine.handler.on_message(msg, socket.clone());
Ok(())
}
p => return Err(Error::BadPacket(p)),
},
Message::Binary(mut data) => {
#[cfg(feature = "v3")]
if socket.protocol == ProtocolVersion::V3 && !data.is_empty() {
// The first byte is the message type, which we don't need.
data = data.slice(1..);
let _ = data.remove(0);
}
engine.handler.on_binary(data, socket.clone());
engine.handler.on_binary(data.into(), socket.clone());
Ok(())
}
Message::Close(_) => break,
Expand Down Expand Up @@ -222,12 +220,12 @@ where
macro_rules! map_fn {
($item:ident) => {
let res = match $item {
Packet::Binary(bin) => tx.feed(Message::Binary(bin)).await,
Packet::BinaryV3(bin) => {
// v3 protocol requires packet type as the first byte
let mut buf = Vec::with_capacity(bin.len() + 1);
buf.push(0x04);
buf.extend_from_slice(&bin);
Packet::Binary(bin) | Packet::BinaryV3(bin) => {
let mut bin: Vec<u8> = bin.into();
if socket.protocol == ProtocolVersion::V3 {
// v3 protocol requires packet type as the first byte
bin.insert(0, 0x04);
}
tx.feed(Message::Binary(bin)).await
}
Packet::Close => {
Expand All @@ -241,7 +239,7 @@ where
Packet::Noop => Ok(()),
_ => {
let packet: String = $item.try_into().unwrap();
tx.feed(Message::Text(Utf8Bytes::from(packet))).await
tx.feed(Message::Text(packet)).await
}
};
if let Err(_e) = res {
Expand Down Expand Up @@ -276,8 +274,7 @@ where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let packet = Packet::Open(OpenPacket::new(TransportType::Websocket, sid, config));
let value: String = packet.try_into()?;
ws.send(Message::Text(Utf8Bytes::from(value))).await?;
ws.send(Message::Text(packet.try_into()?)).await?;
Ok(())
}

Expand Down Expand Up @@ -320,11 +317,11 @@ where
Some(Ok(Message::Text(d))) => d,
_ => Err(Error::Upgrade)?,
};
match Packet::try_from(unsafe { Str::from_bytes_unchecked(msg.into()) })? {
match Packet::try_from(msg)? {
Packet::PingUpgrade => {
// Respond with a PongUpgrade packet
let msg: String = Packet::PongUpgrade.try_into()?;
ws.send(Message::Text(Utf8Bytes::from(msg))).await?;
ws.send(Message::Text(Packet::PongUpgrade.try_into()?))
.await?;
}
p => Err(Error::BadPacket(p))?,
};
Expand All @@ -346,7 +343,7 @@ where
Err(Error::Upgrade)?
}
};
match Packet::try_from(unsafe { Str::from_bytes_unchecked(msg.into()) })? {
match Packet::try_from(msg)? {
Packet::Upgrade => {
#[cfg(feature = "tracing")]
tracing::debug!("ws upgraded successful")
Expand Down

0 comments on commit 8df6c8f

Please sign in to comment.