Skip to content

Commit

Permalink
Bump tokio tungstenite to 0.26 (#428)
Browse files Browse the repository at this point in the history
  • Loading branch information
Totodore authored Jan 13, 2025
1 parent 8df6c8f commit fe09c45
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ bytes = { version = "1.7", features = ["serde"] }
futures-core = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"] }
tokio = "1.40"
tokio-tungstenite = "0.24"
tokio-tungstenite = "0.26"
serde = { version = "1.0", features = ["derive"] }
smallvec = { version = "1.13", features = ["union"] }
serde_json = "1.0"
Expand Down
23 changes: 17 additions & 6 deletions crates/engineioxide/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,11 @@ impl Packet {
}

/// Serialize a [Packet] to a [String] according to the Engine.IO protocol
impl TryInto<String> for Packet {
type Error = Error;
fn try_into(self) -> Result<String, Self::Error> {
let len = self.get_size_hint(true);
impl From<Packet> for String {
fn from(packet: Packet) -> String {
let len = packet.get_size_hint(true);
let mut buffer = String::with_capacity(len);
match self {
match packet {
Packet::Open(open) => {
buffer.push('0');
buffer.push_str(&serde_json::to_string(&open).unwrap());
Expand All @@ -140,7 +139,12 @@ impl TryInto<String> for Packet {
general_purpose::STANDARD.encode_string(data, &mut buffer);
}
};
Ok(buffer)
buffer
}
}
impl From<Packet> for tokio_tungstenite::tungstenite::Utf8Bytes {
fn from(value: Packet) -> Self {
String::from(value).into()
}
}
/// Deserialize a [Packet] from a [String] according to the Engine.IO protocol
Expand Down Expand Up @@ -176,6 +180,13 @@ impl TryFrom<Str> for Packet {
Ok(res)
}
}
impl TryFrom<tokio_tungstenite::tungstenite::Utf8Bytes> for Packet {
type Error = Error;
fn try_from(value: tokio_tungstenite::tungstenite::Utf8Bytes) -> Result<Self, Self::Error> {
// SAFETY: The utf8 bytes are guaranteed to be valid utf8
Packet::try_from(unsafe { Str::from_bytes_unchecked(value.into()) })
}
}

impl TryFrom<String> for Packet {
type Error = Error;
Expand Down
8 changes: 4 additions & 4 deletions crates/engineioxide/src/transport/polling/payload/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub async fn v4_encoder(
try_recv_packet(&mut rx, data.len() + PUNCTUATION_LEN, max_payload, true)
{
for packet in packets {
let packet: String = packet.try_into()?;
let packet: String = packet.into();

if !data.is_empty() {
data.push(std::char::from_u32(PACKET_SEPARATOR_V4 as u32).unwrap());
Expand All @@ -101,7 +101,7 @@ pub async fn v4_encoder(
if data.is_empty() {
let packets = recv_packet(&mut rx).await?;
for packet in packets {
let packet: String = packet.try_into()?;
let packet: String = packet.into();
data.push_str(&packet);
}
}
Expand Down Expand Up @@ -133,7 +133,7 @@ pub fn v3_bin_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Resu
data.extend_from_slice(&bin); // raw data
}
packet => {
let packet: String = packet.try_into()?;
let packet: String = packet.into();
let len = itoa.format(packet.len());
let len_len = len.len(); // len is guaranteed to be ascii

Expand All @@ -156,7 +156,7 @@ pub fn v3_bin_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Resu
pub fn v3_string_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Result<(), Error> {
use crate::transport::polling::payload::STRING_PACKET_SEPARATOR_V3;
use bytes::BufMut;
let packet: String = packet.try_into()?;
let packet: String = packet.into();
let packet = format!(
"{}{}{}",
packet.chars().count(),
Expand Down
23 changes: 13 additions & 10 deletions crates/engineioxide/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ where
Message::Binary(mut data) => {
if socket.protocol == ProtocolVersion::V3 && !data.is_empty() {
// The first byte is the message type, which we don't need.
let _ = data.remove(0);
data = data.split_off(1);
}
engine.handler.on_binary(data.into(), socket.clone());
engine.handler.on_binary(data, socket.clone());
Ok(())
}
Message::Close(_) => break,
Expand Down Expand Up @@ -221,12 +221,16 @@ where
($item:ident) => {
let res = match $item {
Packet::Binary(bin) | Packet::BinaryV3(bin) => {
let mut bin: Vec<u8> = bin.into();
if socket.protocol == ProtocolVersion::V3 {
// v3 protocol requires packet type as the first byte
bin.insert(0, 0x04);
// v3 protocol requires packet type as the first byte.
// This requires a new buffer. This is OK as it is only for the V3 protocol.
let mut buff = Vec::with_capacity(bin.len() + 1);
buff.push(0x04);
buff.extend(bin);
tx.feed(Message::Binary(buff.into())).await
} else {
tx.feed(Message::Binary(bin)).await
}
tx.feed(Message::Binary(bin)).await
}
Packet::Close => {
tx.send(Message::Close(None)).await.ok();
Expand All @@ -239,7 +243,7 @@ where
Packet::Noop => Ok(()),
_ => {
let packet: String = $item.try_into().unwrap();
tx.feed(Message::Text(packet)).await
tx.feed(Message::Text(packet.into())).await
}
};
if let Err(_e) = res {
Expand Down Expand Up @@ -274,7 +278,7 @@ where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let packet = Packet::Open(OpenPacket::new(TransportType::Websocket, sid, config));
ws.send(Message::Text(packet.try_into()?)).await?;
ws.send(Message::Text(packet.into())).await?;
Ok(())
}

Expand Down Expand Up @@ -320,8 +324,7 @@ where
match Packet::try_from(msg)? {
Packet::PingUpgrade => {
// Respond with a PongUpgrade packet
ws.send(Message::Text(Packet::PongUpgrade.try_into()?))
.await?;
ws.send(Message::Text(Packet::PongUpgrade.into())).await?;
}
p => Err(Error::BadPacket(p))?,
};
Expand Down
4 changes: 1 addition & 3 deletions crates/socketioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,7 @@ pub async fn server_ns_close() {
});

let mut ws = create_ws_connection(12353).await;
ws.send(Message::Text("40/test,{}".to_string()))
.await
.unwrap();
ws.send(Message::Text("40/test,{}".into())).await.unwrap();
let data = tokio::time::timeout(Duration::from_millis(20), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::ServerNSDisconnect")
Expand Down
2 changes: 1 addition & 1 deletion crates/socketioxide/tests/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub async fn create_ws_connection(port: u16) -> WebSocketStream<MaybeTlsStream<T
.unwrap()
.0;

ws.send(Message::Text("40{}".to_string())).await.unwrap();
ws.send(Message::Text("40{}".into())).await.unwrap();

ws
}
Expand Down

0 comments on commit fe09c45

Please sign in to comment.