Skip to content

Commit

Permalink
Merge branch 'main' into bump-tokio-tungstenite
Browse files Browse the repository at this point in the history
  • Loading branch information
Totodore authored Jan 13, 2025
2 parents e37edb6 + 8df6c8f commit 9118637
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ resolver = "2"
bytes = { version = "1.7", features = ["serde"] }
futures-core = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"] }
tokio = "1.43"
tokio = "1.40"
tokio-tungstenite = "0.26"
serde = { version = "1.0", features = ["derive"] }
smallvec = { version = "1.13", features = ["union"] }
Expand Down
23 changes: 17 additions & 6 deletions crates/engineioxide/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,11 @@ impl Packet {
}

/// Serialize a [Packet] to a [String] according to the Engine.IO protocol
impl TryInto<String> for Packet {
type Error = Error;
fn try_into(self) -> Result<String, Self::Error> {
let len = self.get_size_hint(true);
impl From<Packet> for String {
fn from(packet: Packet) -> String {
let len = packet.get_size_hint(true);
let mut buffer = String::with_capacity(len);
match self {
match packet {
Packet::Open(open) => {
buffer.push('0');
buffer.push_str(&serde_json::to_string(&open).unwrap());
Expand All @@ -140,7 +139,12 @@ impl TryInto<String> for Packet {
general_purpose::STANDARD.encode_string(data, &mut buffer);
}
};
Ok(buffer)
buffer
}
}
impl From<Packet> for tokio_tungstenite::tungstenite::Utf8Bytes {
fn from(value: Packet) -> Self {
String::from(value).into()
}
}
/// Deserialize a [Packet] from a [String] according to the Engine.IO protocol
Expand Down Expand Up @@ -176,6 +180,13 @@ impl TryFrom<Str> for Packet {
Ok(res)
}
}
impl TryFrom<tokio_tungstenite::tungstenite::Utf8Bytes> for Packet {
type Error = Error;
fn try_from(value: tokio_tungstenite::tungstenite::Utf8Bytes) -> Result<Self, Self::Error> {
// SAFETY: The utf8 bytes are guaranteed to be valid utf8
Packet::try_from(unsafe { Str::from_bytes_unchecked(value.into()) })
}
}

impl TryFrom<String> for Packet {
type Error = Error;
Expand Down
8 changes: 4 additions & 4 deletions crates/engineioxide/src/transport/polling/payload/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub async fn v4_encoder(
try_recv_packet(&mut rx, data.len() + PUNCTUATION_LEN, max_payload, true)
{
for packet in packets {
let packet: String = packet.try_into()?;
let packet: String = packet.into();

if !data.is_empty() {
data.push(std::char::from_u32(PACKET_SEPARATOR_V4 as u32).unwrap());
Expand All @@ -101,7 +101,7 @@ pub async fn v4_encoder(
if data.is_empty() {
let packets = recv_packet(&mut rx).await?;
for packet in packets {
let packet: String = packet.try_into()?;
let packet: String = packet.into();
data.push_str(&packet);
}
}
Expand Down Expand Up @@ -133,7 +133,7 @@ pub fn v3_bin_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Resu
data.extend_from_slice(&bin); // raw data
}
packet => {
let packet: String = packet.try_into()?;
let packet: String = packet.into();
let len = itoa.format(packet.len());
let len_len = len.len(); // len is guaranteed to be ascii

Expand All @@ -156,7 +156,7 @@ pub fn v3_bin_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Resu
pub fn v3_string_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Result<(), Error> {
use crate::transport::polling::payload::STRING_PACKET_SEPARATOR_V3;
use bytes::BufMut;
let packet: String = packet.try_into()?;
let packet: String = packet.into();
let packet = format!(
"{}{}{}",
packet.chars().count(),
Expand Down
80 changes: 38 additions & 42 deletions crates/engineioxide/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::{
task::JoinHandle,
};
use tokio_tungstenite::{
tungstenite::{handshake::derive_accept_key, protocol::Role, Message, Utf8Bytes},
tungstenite::{handshake::derive_accept_key, protocol::Role, Message},
WebSocketStream,
};

Expand All @@ -27,9 +27,10 @@ use crate::{
errors::Error,
handler::EngineIoHandler,
packet::{OpenPacket, Packet},
service::{ProtocolVersion, TransportType},
service::ProtocolVersion,
service::TransportType,
sid::Sid,
DisconnectReason, Socket, Str,
DisconnectReason, Socket,
};

/// Create a response for websocket upgrade
Expand Down Expand Up @@ -164,33 +165,28 @@ where
{
while let Some(msg) = rx.try_next().await? {
match msg {
Message::Text(msg) => {
match Packet::try_from(unsafe { Str::from_bytes_unchecked(msg.into()) })? {
Packet::Close => {
#[cfg(feature = "tracing")]
tracing::debug!("[sid={}] closing session", socket.id);
engine.close_session(socket.id, DisconnectReason::TransportClose);
break;
}
Packet::Pong | Packet::Ping => socket
.heartbeat_tx
.try_send(())
.map_err(|_| Error::HeartbeatTimeout),
Packet::Message(msg) => {
engine.handler.on_message(msg, socket.clone());
Ok(())
}
p => return Err(Error::BadPacket(p)),
Message::Text(msg) => match Packet::try_from(msg)? {
Packet::Close => {
#[cfg(feature = "tracing")]
tracing::debug!("[sid={}] closing session", socket.id);
engine.close_session(socket.id, DisconnectReason::TransportClose);
break;
}
}
#[allow(unused_mut)]
Packet::Pong | Packet::Ping => socket
.heartbeat_tx
.try_send(())
.map_err(|_| Error::HeartbeatTimeout),
Packet::Message(msg) => {
engine.handler.on_message(msg, socket.clone());
Ok(())
}
p => return Err(Error::BadPacket(p)),
},
Message::Binary(mut data) => {
let data = if socket.protocol == ProtocolVersion::V3 && !data.is_empty() {
if socket.protocol == ProtocolVersion::V3 && !data.is_empty() {
// The first byte is the message type, which we don't need.
data.slice(1..)
} else {
data
};
data = data.split_off(1);
}
engine.handler.on_binary(data, socket.clone());
Ok(())
}
Expand Down Expand Up @@ -224,15 +220,17 @@ where
macro_rules! map_fn {
($item:ident) => {
let res = match $item {
Packet::Binary(bin) if socket.protocol != ProtocolVersion::V3 => {
tx.feed(Message::Binary(bin)).await
}
Packet::Binary(bin) | Packet::BinaryV3(bin) => {
// v3 protocol requires packet type as the first byte.
let mut buf = Vec::with_capacity(bin.len() + 1);
buf.push(0x04);
buf.extend_from_slice(&bin);
tx.feed(Message::Binary(bin)).await
if socket.protocol == ProtocolVersion::V3 {
// v3 protocol requires packet type as the first byte.
// This requires a new buffer. This is OK as it is only for the V3 protocol.
let mut buff = Vec::with_capacity(bin.len() + 1);
buff.push(0x04);
buff.extend(bin);
tx.feed(Message::Binary(buff.into())).await
} else {
tx.feed(Message::Binary(bin)).await
}
}
Packet::Close => {
tx.send(Message::Close(None)).await.ok();
Expand All @@ -245,7 +243,7 @@ where
Packet::Noop => Ok(()),
_ => {
let packet: String = $item.try_into().unwrap();
tx.feed(Message::Text(Utf8Bytes::from(packet))).await
tx.feed(Message::Text(packet.into())).await
}
};
if let Err(_e) = res {
Expand Down Expand Up @@ -280,8 +278,7 @@ where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let packet = Packet::Open(OpenPacket::new(TransportType::Websocket, sid, config));
let value: String = packet.try_into()?;
ws.send(Message::Text(Utf8Bytes::from(value))).await?;
ws.send(Message::Text(packet.into())).await?;
Ok(())
}

Expand Down Expand Up @@ -324,11 +321,10 @@ where
Some(Ok(Message::Text(d))) => d,
_ => Err(Error::Upgrade)?,
};
match Packet::try_from(unsafe { Str::from_bytes_unchecked(msg.into()) })? {
match Packet::try_from(msg)? {
Packet::PingUpgrade => {
// Respond with a PongUpgrade packet
let msg: String = Packet::PongUpgrade.try_into()?;
ws.send(Message::Text(Utf8Bytes::from(msg))).await?;
ws.send(Message::Text(Packet::PongUpgrade.into())).await?;
}
p => Err(Error::BadPacket(p))?,
};
Expand All @@ -350,7 +346,7 @@ where
Err(Error::Upgrade)?
}
};
match Packet::try_from(unsafe { Str::from_bytes_unchecked(msg.into()) })? {
match Packet::try_from(msg)? {
Packet::Upgrade => {
#[cfg(feature = "tracing")]
tracing::debug!("ws upgraded successful")
Expand Down

0 comments on commit 9118637

Please sign in to comment.