From 302e2a2ca0003a4dcec2fed99bdada059466bac2 Mon Sep 17 00:00:00 2001 From: theodore Date: Mon, 24 Jun 2024 14:28:55 +0200 Subject: [PATCH] feat: clonable handler --- e2e/engineioxide/engineioxide.rs | 2 +- engineioxide/src/engine.rs | 2 +- engineioxide/src/handler.rs | 2 +- socketioxide/src/client.rs | 48 ++++++++--------- socketioxide/src/errors.rs | 2 + socketioxide/src/extract/mod.rs | 3 +- socketioxide/src/handler/connect.rs | 31 +++++++++-- socketioxide/src/io.rs | 41 ++++++--------- socketioxide/src/lib.rs | 10 ++-- socketioxide/src/ns.rs | 8 +-- socketioxide/src/operators.rs | 2 +- socketioxide/src/socket.rs | 1 + socketioxide/tests/acknowledgement.rs | 6 +-- socketioxide/tests/concurrent_emit.rs | 3 +- socketioxide/tests/connect.rs | 70 +++++-------------------- socketioxide/tests/disconnect_reason.rs | 9 ++-- socketioxide/tests/extractors.rs | 19 +++---- 17 files changed, 113 insertions(+), 146 deletions(-) diff --git a/e2e/engineioxide/engineioxide.rs b/e2e/engineioxide/engineioxide.rs index 1c3176ca..7dc50f56 100644 --- a/e2e/engineioxide/engineioxide.rs +++ b/e2e/engineioxide/engineioxide.rs @@ -29,7 +29,7 @@ impl EngineIoHandler for MyHandler { println!("socket disconnect {}: {:?}", socket.id, reason); } - fn on_message(self: &Arc, msg: Str, socket: Arc>) { + fn on_message(&self, msg: Str, socket: Arc>) { println!("Ping pong message {:?}", msg); socket.emit(msg).ok(); } diff --git a/engineioxide/src/engine.rs b/engineioxide/src/engine.rs index fb3cd4de..f1ba50e7 100644 --- a/engineioxide/src/engine.rs +++ b/engineioxide/src/engine.rs @@ -115,7 +115,7 @@ mod tests { println!("socket disconnect {} {:?}", socket.id, reason); } - fn on_message(self: &Arc, msg: Str, socket: Arc>) { + fn on_message(&self, msg: Str, socket: Arc>) { println!("Ping pong message {:?}", msg); socket.emit(msg).ok(); } diff --git a/engineioxide/src/handler.rs b/engineioxide/src/handler.rs index 79c71f7f..fbce60d5 100644 --- a/engineioxide/src/handler.rs +++ b/engineioxide/src/handler.rs @@ -60,7 +60,7 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + 'static { fn on_disconnect(&self, socket: Arc>, reason: DisconnectReason); /// Called when a message is received from the client. - fn on_message(self: &Arc, msg: Str, socket: Arc>); + fn on_message(&self, msg: Str, socket: Arc>); /// Called when a binary message is received from the client. fn on_binary(&self, data: Bytes, socket: Arc>); diff --git a/socketioxide/src/client.rs b/socketioxide/src/client.rs index 54e25a62..af4d6274 100644 --- a/socketioxide/src/client.rs +++ b/socketioxide/src/client.rs @@ -9,7 +9,7 @@ use engineioxide::Str; use futures_util::{FutureExt, TryFutureExt}; use engineioxide::sid::Sid; -use matchit::Router; +use matchit::{Match, Router}; use tokio::sync::oneshot; use crate::adapter::Adapter; @@ -54,7 +54,7 @@ impl Client { /// Called when a socket connects to a new namespace fn sock_connect( - self: Arc, + &self, auth: Option, ns_path: Str, esocket: Arc>>, @@ -62,27 +62,23 @@ impl Client { #[cfg(feature = "tracing")] tracing::debug!("auth: {:?}", auth); let protocol: ProtocolVersion = esocket.protocol.into(); - let esocket_clone = esocket.clone(); - let connect = move |ns: Arc>| async move { - if ns - .connect(esocket_clone.id, esocket_clone.clone(), auth) - .await - .is_ok() - { - // cancel the connect timeout task for v5 - if let Some(tx) = esocket_clone.data.connect_recv_tx.lock().unwrap().take() { - tx.send(()).ok(); + let connect = + move |ns: Arc>, esocket: Arc>>| async move { + if ns.connect(esocket.id, esocket.clone(), auth).await.is_ok() { + // cancel the connect timeout task for v5 + if let Some(tx) = esocket.data.connect_recv_tx.lock().unwrap().take() { + tx.send(()).ok(); + } } - } - }; + }; if let Some(ns) = self.get_ns(&ns_path) { - tokio::spawn(connect(ns)); - } else if let Ok(res) = self.router.read().unwrap().at(&ns_path) { + tokio::spawn(connect(ns, esocket)); + } else if let Ok(Match { value: ns_ctr, .. }) = self.router.read().unwrap().at(&ns_path) { let path: Cow<'static, str> = Cow::Owned(ns_path.clone().into()); - let ns = res.value.get_new_ns(ns_path); //TODO: check memory leak here + let ns = ns_ctr.get_new_ns(ns_path); //TODO: check memory leak here self.ns.write().unwrap().insert(path, ns.clone()); - tokio::spawn(connect(ns)); + tokio::spawn(connect(ns, esocket)); } else if protocol == ProtocolVersion::V4 && ns_path == "/" { #[cfg(feature = "tracing")] tracing::error!( @@ -129,7 +125,7 @@ impl Client { /// Adds a new namespace handler pub fn add_ns(&self, path: Cow<'static, str>, callback: C) where - C: ConnectHandler + Clone, + C: ConnectHandler, T: Send + Sync + 'static, { #[cfg(feature = "tracing")] @@ -140,7 +136,7 @@ impl Client { pub fn add_dyn_ns(&self, path: String, callback: C) -> Result<(), matchit::InsertError> where - C: ConnectHandler + Clone, + C: ConnectHandler, T: Send + Sync + 'static, { #[cfg(feature = "tracing")] @@ -176,8 +172,8 @@ impl Client { tracing::debug!("closing all namespaces"); let ns = { std::mem::take(&mut *self.ns.write().unwrap()) }; futures_util::future::join_all( - ns.iter() - .map(|(_, ns)| ns.close(DisconnectReason::ClosingServer)), + ns.values() + .map(|ns| ns.close(DisconnectReason::ClosingServer)), ) .await; #[cfg(feature = "tracing")] @@ -239,8 +235,8 @@ impl EngineIoHandler for Client { .ns .read() .unwrap() - .iter() - .filter_map(|(_, ns)| ns.get_socket(socket.id).ok()) + .values() + .filter_map(|ns| ns.get_socket(socket.id).ok()) .collect(); let _res: Result, _> = socks @@ -259,7 +255,7 @@ impl EngineIoHandler for Client { } } - fn on_message(self: &Arc, msg: Str, socket: Arc>>) { + fn on_message(&self, msg: Str, socket: Arc>>) { #[cfg(feature = "tracing")] tracing::debug!("Received message: {:?}", msg); let packet = match Packet::try_from(msg) { @@ -276,7 +272,7 @@ impl EngineIoHandler for Client { let res: Result<(), Error> = match packet.inner { PacketData::Connect(auth) => { - self.clone().sock_connect(auth, packet.ns, socket.clone()); + self.sock_connect(auth, packet.ns, socket.clone()); Ok(()) } PacketData::BinaryEvent(_, _, _) | PacketData::BinaryAck(_, _) => { diff --git a/socketioxide/src/errors.rs b/socketioxide/src/errors.rs index 63e446f8..8984c543 100644 --- a/socketioxide/src/errors.rs +++ b/socketioxide/src/errors.rs @@ -2,6 +2,8 @@ use engineioxide::{sid::Sid, socket::DisconnectReason as EIoDisconnectReason}; use std::fmt::{Debug, Display}; use tokio::{sync::mpsc::error::TrySendError, time::error::Elapsed}; +pub use matchit::InsertError as NsInsertError; + /// Error type for socketio #[derive(thiserror::Error, Debug)] pub enum Error { diff --git a/socketioxide/src/extract/mod.rs b/socketioxide/src/extract/mod.rs index 8a24ca5e..6a90e6da 100644 --- a/socketioxide/src/extract/mod.rs +++ b/socketioxide/src/extract/mod.rs @@ -22,7 +22,6 @@ //! * [`HttpExtension`]: extracts an http extension of the given type coming from the request. //! (Similar to axum's [`extract::Extension`](https://docs.rs/axum/latest/axum/struct.Extension.html) //! * [`MaybeHttpExtension`]: extracts an http extension of the given type if it exists or [`None`] otherwise. -//! * [`NsParam`]: extracts and deserialize the namespace path parameters. Works only for the [`ConnectHandler`] and [`ConnectMiddleware`]. //! //! ### You can also implement your own Extractor with the [`FromConnectParts`], [`FromMessageParts`] and [`FromDisconnectParts`] traits //! When implementing these traits, if you clone the [`Arc`](crate::socket::Socket) make sure that it is dropped at least when the socket is disconnected. @@ -59,7 +58,7 @@ //! //! impl FromConnectParts for UserId { //! type Error = Infallible; -//! fn from_connect_parts(s: &Arc>, _: &Option, _: &NsParamBuff<'_>) -> Result { +//! fn from_connect_parts(s: &Arc>, _: &Option) -> Result { //! // In a real app it would be better to parse the query params with a crate like `url` //! let uri = &s.req_parts().uri; //! let uid = uri diff --git a/socketioxide/src/handler/connect.rs b/socketioxide/src/handler/connect.rs index 2be32d24..97051004 100644 --- a/socketioxide/src/handler/connect.rs +++ b/socketioxide/src/handler/connect.rs @@ -158,7 +158,7 @@ pub trait FromConnectParts: Sized { /// /// * See the [`connect`](super::connect) module doc for more details on connect middlewares. /// * See the [`extract`](crate::extract) module doc for more details on available extractors. -pub trait ConnectMiddleware: Send + Sync + 'static { +pub trait ConnectMiddleware: Sized + Clone + Send + Sync + 'static { /// Call the middleware with the given arguments. fn call<'a>( &'a self, @@ -177,7 +177,7 @@ pub trait ConnectMiddleware: Send + Sync + 'static { /// /// * See the [`connect`](super::connect) module doc for more details on connect handler. /// * See the [`extract`](crate::extract) module doc for more details on available extractors. -pub trait ConnectHandler: Send + Sync + 'static { +pub trait ConnectHandler: Sized + Clone + Send + Sync + 'static { /// Call the handler with the given arguments. fn call(&self, s: Arc>, auth: Option); @@ -236,7 +236,6 @@ pub trait ConnectHandler: Send + Sync + 'static { /// ``` fn with(self, middleware: M) -> impl ConnectHandler where - Self: Sized, M: ConnectMiddleware + Send + Sync + 'static, T: Send + Sync + 'static, T1: Send + Sync + 'static, @@ -344,6 +343,32 @@ where self.middleware.call(s, auth).await } } +impl Clone for LayeredConnectHandler +where + H: Clone, + N: Clone, +{ + fn clone(&self) -> Self { + Self { + handler: self.handler.clone(), + middleware: self.middleware.clone(), + phantom: self.phantom, + } + } +} +impl Clone for ConnectMiddlewareLayer +where + M: Clone, + N: Clone, +{ + fn clone(&self) -> Self { + Self { + middleware: self.middleware.clone(), + next: self.next.clone(), + phantom: self.phantom, + } + } +} impl ConnectMiddleware for ConnectMiddlewareLayer where diff --git a/socketioxide/src/io.rs b/socketioxide/src/io.rs index 890e1987..89cc10e7 100644 --- a/socketioxide/src/io.rs +++ b/socketioxide/src/io.rs @@ -5,7 +5,7 @@ use engineioxide::{ config::{EngineIoConfig, EngineIoConfigBuilder}, service::NotFoundService, sid::Sid, - Str, TransportType, + TransportType, }; use crate::{ @@ -360,35 +360,28 @@ impl SocketIo { /// }); /// /// ``` - /// - /// #### Example with dynamic namespace: - /// ``` - /// # use socketioxide::{SocketIo, extract::{NsParam, SocketRef}}; - /// #[derive(Debug, serde::Deserialize)] - /// struct Params { - /// id: String, - /// user_id: String - /// } - /// - /// let (_svc, io) = SocketIo::new_svc(); - /// io.ns("/{id}/user/{user_id}", |s: SocketRef, NsParam(params): NsParam| { - /// println!("new socket with params: {:?}", params); - /// }).unwrap(); - /// - /// // You can specify any type that implements the `serde::Deserialize` trait. - /// io.ns("/{id}/admin/{role}", |s: SocketRef, NsParam(params): NsParam<(usize, String)>| { - /// println!("new socket with params: {:?}", params); - /// }).unwrap(); - /// ``` #[inline] pub fn ns(&self, path: impl Into>, callback: C) where - C: ConnectHandler + Clone, + C: ConnectHandler, T: Send + Sync + 'static, { self.0.add_ns(path.into(), callback) } + #[inline] + pub fn dyn_ns( + &self, + path: impl Into, + callback: C, + ) -> Result<(), crate::NsInsertError> + where + C: ConnectHandler, + T: Send + Sync + 'static, + { + self.0.add_dyn_ns(path.into(), callback) + } + /// Deletes the namespace with the given path. /// /// This will disconnect all sockets connected to this @@ -433,9 +426,9 @@ impl SocketIo { /// /// ## Example with a dynamic namespace /// ``` - /// # use socketioxide::{SocketIo, extract::{SocketRef, NsParam}}; + /// # use socketioxide::{SocketIo, extract::{SocketRef}}; /// let (_, io) = SocketIo::new_svc(); - /// io.ns("/{id}/{user_id}", |socket: SocketRef, NsParam(params): NsParam<(String, String)>| { + /// io.ns("/{id}/{user_id}", |socket: SocketRef| { /// println!("Socket connected on {} namespace with params {:?}", socket.ns(), params); /// }); /// diff --git a/socketioxide/src/lib.rs b/socketioxide/src/lib.rs index b0d03cc9..6b4e1a79 100644 --- a/socketioxide/src/lib.rs +++ b/socketioxide/src/lib.rs @@ -197,7 +197,7 @@ //! //! Path parameters must be wrapped in curly braces `{}`: //! ``` -//! # use socketioxide::{SocketIo, extract::{NsParam, SocketRef}}; +//! # use socketioxide::{SocketIo, extract::SocketRef}; //! #[derive(Debug, serde::Deserialize)] //! struct Params { //! id: String, @@ -207,12 +207,12 @@ //! let (_svc, io) = SocketIo::new_svc(); //! io.ns("/{id}/user/{user_id}", |s: SocketRef, NsParam(params): NsParam| { //! println!("new socket with params: {:?}", params); -//! }).unwrap(); +//! }); //! //! // You can specify any type that implements the `serde::Deserialize` trait. //! io.ns("/{id}/admin/{role}", |s: SocketRef, NsParam(params): NsParam<(usize, String)>| { //! println!("new socket with params: {:?}", params); -//! }).unwrap(); +//! }); //! ``` //! //! You can check the [`matchit`] crate for more details on the path parameters format. @@ -332,7 +332,9 @@ pub mod service; pub mod socket; pub use engineioxide::TransportType; -pub use errors::{AckError, AdapterError, BroadcastError, DisconnectError, SendError, SocketError}; +pub use errors::{ + AckError, AdapterError, BroadcastError, DisconnectError, NsInsertError, SendError, SocketError, +}; pub use io::{SocketIo, SocketIoBuilder, SocketIoConfig}; mod client; diff --git a/socketioxide/src/ns.rs b/socketioxide/src/ns.rs index 6238af0c..871850c5 100644 --- a/socketioxide/src/ns.rs +++ b/socketioxide/src/ns.rs @@ -13,6 +13,9 @@ use crate::{ use crate::{client::SocketData, errors::AdapterError}; use engineioxide::{sid::Sid, Str}; +/// A [`Namespace`] constructor used for dynamic namespaces +/// A namespace constructor only hold a common handler that will be cloned +/// to the instantiated namespaces. pub struct NamespaceCtr { handler: BoxedConnectHandler, } @@ -27,7 +30,7 @@ pub struct Namespace { impl NamespaceCtr { pub fn new(handler: C) -> Self where - C: ConnectHandler + Clone + Send + Sync + 'static, + C: ConnectHandler + Send + Sync + 'static, T: Send + Sync + 'static, { Self { @@ -47,7 +50,7 @@ impl NamespaceCtr { impl Namespace { pub fn new(path: Str, handler: C) -> Arc where - C: ConnectHandler + Clone + Send + Sync + 'static, + C: ConnectHandler + Send + Sync + 'static, T: Send + Sync + 'static, { Arc::new_cyclic(|ns| Self { @@ -70,7 +73,6 @@ impl Namespace { esocket: Arc>>, auth: Option, ) -> Result<(), ConnectFail> { - // deep-clone to avoid packet memory leak let socket: Arc> = Socket::new(sid, self.clone(), esocket.clone()).into(); if let Err(e) = self.handler.call_middleware(socket.clone(), &auth).await { diff --git a/socketioxide/src/operators.rs b/socketioxide/src/operators.rs index 1705d04a..5b13f3b4 100644 --- a/socketioxide/src/operators.rs +++ b/socketioxide/src/operators.rs @@ -509,7 +509,7 @@ impl BroadcastOperators { }, } } - pub(crate) fn from_sock(ns: Arc>, sid: Sid, ns_path: Str) -> Self { + pub(crate) fn from_sock(ns: Arc>, sid: Sid) -> Self { Self { binary: vec![], timeout: None, diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index 8dd27f88..1c4fc3e6 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -844,6 +844,7 @@ impl Socket { #[cfg(test)] mod test { use super::*; + use engineioxide::Str; #[tokio::test] async fn send_with_ack_error() { diff --git a/socketioxide/tests/acknowledgement.rs b/socketioxide/tests/acknowledgement.rs index 685ff9dd..d8f4b24a 100644 --- a/socketioxide/tests/acknowledgement.rs +++ b/socketioxide/tests/acknowledgement.rs @@ -25,8 +25,7 @@ pub async fn emit_with_ack() { let res = assert_ok!(res).await; let ack = assert_ok!(res); assert_ok!(tx.try_send(ack.data)); - }) - .unwrap(); + }); let (stx, mut srx) = io.new_dummy_sock("/", ()).await; assert_some!(srx.recv().await); // NS connect packet @@ -87,8 +86,7 @@ pub async fn broadcast_with_ack() { async move {} }) .await; - }) - .unwrap(); + }); // Spawn 5 clients and make them echo the ack for _ in 0..5 { diff --git a/socketioxide/tests/concurrent_emit.rs b/socketioxide/tests/concurrent_emit.rs index 1270ac3e..48df8494 100644 --- a/socketioxide/tests/concurrent_emit.rs +++ b/socketioxide/tests/concurrent_emit.rs @@ -26,8 +26,7 @@ pub async fn emit() { } }); } - }) - .unwrap(); + }); let (_stx, mut srx) = io.new_dummy_sock("/", ()).await; assert_some!(srx.recv().await); diff --git a/socketioxide/tests/connect.rs b/socketioxide/tests/connect.rs index 79928dd7..0041a4b3 100644 --- a/socketioxide/tests/connect.rs +++ b/socketioxide/tests/connect.rs @@ -3,10 +3,7 @@ mod utils; use bytes::Bytes; use engineioxide::Packet::*; use socketioxide::{ - extract::{NsParam, SocketRef}, - handler::ConnectHandler, - packet::Packet, - SendError, SocketError, SocketIo, + extract::SocketRef, handler::ConnectHandler, packet::Packet, SendError, SocketError, SocketIo, }; use tokio::sync::mpsc; @@ -15,7 +12,7 @@ fn create_msg( event: &str, data: impl Into, ) -> engineioxide::Packet { - let packet: String = Packet::event(ns, event, data.into()).into(); + let packet: String = Packet::event(ns.into(), event, data.into()).into(); Message(packet.into()) } async fn timeout_rcv(srx: &mut tokio::sync::mpsc::Receiver) -> T { @@ -63,8 +60,7 @@ pub async fn connect_middleware() { io.ns( "/", { || {} }.with(handler(3)).with(handler(2)).with(handler(1)), - ) - .unwrap(); + ); let (_, mut srx) = io.new_dummy_sock("/", ()).await; assert_eq!(rx.recv().await.unwrap(), 1); @@ -108,8 +104,7 @@ pub async fn connect_middleware_error() { .with(handler(3, false)) .with(handler(2, true)) .with(handler(1, false)), - ) - .unwrap(); + ); let (_, mut srx) = io.new_dummy_sock("/", ()).await; @@ -121,52 +116,16 @@ pub async fn connect_middleware_error() { } #[tokio::test] -async fn ns_connect_with_params() { +async fn ns_dyn_connect() { let (_svc, io) = SocketIo::new_svc(); - let (tx, mut rx) = tokio::sync::mpsc::channel::(1); + let (tx, mut rx) = tokio::sync::mpsc::channel::(1); - io.ns("/admin/{id}/board", move |NsParam(id): NsParam| { - tx.try_send(id).unwrap(); + io.dyn_ns("/admin/{id}/board", move |s: SocketRef| { + tx.try_send(s.ns().to_string()).unwrap(); }) .unwrap(); let (_stx, mut _srx) = io.new_dummy_sock("/admin/132/board", ()).await; - assert_eq!(timeout_rcv(&mut rx).await, 132); -} -#[tokio::test] -async fn ns_connect_with_param_errors() { - let (_svc, io) = SocketIo::new_svc(); - let (tx, mut rx) = tokio::sync::mpsc::channel::(1); - - io.ns("/admin/{id}/board", move |NsParam(id): NsParam| { - tx.try_send(id).unwrap(); - }) - .unwrap(); - let (_stx, mut _srx) = io.new_dummy_sock("/admin/azudnazd/board", ()).await; - let elapsed = tokio::time::timeout(std::time::Duration::from_millis(200), rx.recv()).await; - assert!(elapsed.is_err() || elapsed.unwrap().is_none()); -} -#[tokio::test] -async fn ns_connect_with_params_share_rooms() { - let (_svc, io) = SocketIo::new_svc(); - let (tx, mut rx) = tokio::sync::mpsc::channel::(2); - - io.ns( - "/admin/{id}/board", - move |s: SocketRef, NsParam(id): NsParam| { - s.join(id.clone()).unwrap(); - tx.try_send(id).unwrap(); - }, - ) - .unwrap(); - let (_stx, mut _srx) = io.new_dummy_sock("/admin/1/board", ()).await; - assert_eq!(timeout_rcv(&mut rx).await, "1"); - let (_stx1, mut _srx1) = io.new_dummy_sock("/admin/2/board", ()).await; - assert_eq!(timeout_rcv(&mut rx).await, "2"); - let rooms = assert_ok!(io.of("/admin/2/board").unwrap().rooms()); - assert_eq!(rooms.len(), 2); - for room in rooms { - assert!(matches!(room.as_ref(), "1" | "2")); - } + assert_eq!(timeout_rcv(&mut rx).await, "/admin/132/board"); } #[tokio::test] @@ -177,8 +136,7 @@ async fn remove_ns_from_connect_handler() { io.ns("/test1", move |io: SocketIo| { tx.try_send(()).unwrap(); io.delete_ns("/test1"); - }) - .unwrap(); + }); let (stx, mut srx) = io.new_dummy_sock("/test1", ()).await; timeout_rcv(&mut srx).await; @@ -201,7 +159,7 @@ async fn remove_ns_from_middleware() { Ok::<(), std::convert::Infallible>(()) }; fn handler() {} - io.ns("/test1", handler.with(middleware)).unwrap(); + io.ns("/test1", handler.with(middleware)); let (stx, mut srx) = io.new_dummy_sock("/test1", ()).await; timeout_rcv(&mut srx).await; @@ -223,8 +181,7 @@ async fn remove_ns_from_event_handler() { io.delete_ns("/test1"); tx.try_send(()).unwrap(); }); - }) - .unwrap(); + }); let (stx, mut srx) = io.new_dummy_sock("/test1", ()).await; timeout_rcv(&mut srx).await; @@ -247,8 +204,7 @@ async fn remove_ns_from_disconnect_handler() { io.delete_ns("/test2"); tx.try_send("disconnect").unwrap(); }) - }) - .unwrap(); + }); let (stx, mut srx) = io.new_dummy_sock("/test2", ()).await; assert_eq!(timeout_rcv(&mut rx).await, "connect"); diff --git a/socketioxide/tests/disconnect_reason.rs b/socketioxide/tests/disconnect_reason.rs index adcf65e9..08f05d79 100644 --- a/socketioxide/tests/disconnect_reason.rs +++ b/socketioxide/tests/disconnect_reason.rs @@ -30,8 +30,7 @@ fn attach_handler(io: &SocketIo, chan_size: usize) -> mpsc::Receiver(srx: &mut tokio::sync::mpsc::Receiv } fn create_msg(ns: &'static str, event: &str, data: impl Into) -> EioPacket { - let packet: String = Packet::event(ns, event, data.into()).into(); + let packet: String = Packet::event(ns.into(), event, data.into()).into(); EioPacket::Message(packet.into()) } @@ -40,8 +40,7 @@ pub async fn state_extractor() { socket.on("test", |socket: SocketRef, State(state): State| { assert_ok!(socket.emit("state", state)); }); - }) - .unwrap(); + }); let res_packet = create_msg("/", "state", state); // Connect packet @@ -68,8 +67,7 @@ pub async fn data_extractor() { socket.on("test", move |Data(data): Data| { assert_ok!(tx.try_send(data)); }); - }) - .unwrap(); + }); io.new_dummy_sock("/", ()).await; assert!(matches!( @@ -105,8 +103,7 @@ pub async fn try_data_extractor() { s.on("test", move |TryData(data): TryData| { assert_ok!(tx.try_send(data)); }); - }) - .unwrap(); + }); // Non deserializable data io.new_dummy_sock("/", ()).await; @@ -146,9 +143,9 @@ pub async fn extension_extractor() { } // Namespace without errors (the extension is set) - io.ns("/", ns_root.with(set_ext)).unwrap(); + io.ns("/", ns_root.with(set_ext)); // Namespace with errors (the extension is not set) - io.ns("/test", ns_root).unwrap(); + io.ns("/test", ns_root); // Extract extensions from the socket let (tx, mut rx) = io.new_dummy_sock("/", ()).await; @@ -188,9 +185,9 @@ pub async fn maybe_extension_extractor() { } // Namespace without errors (the extension is set) - io.ns("/", ns_root.with(set_ext)).unwrap(); + io.ns("/", ns_root.with(set_ext)); // Namespace with errors (the extension is not set) - io.ns("/test", ns_root).unwrap(); + io.ns("/test", ns_root); // Extract extensions from the socket let (tx, mut rx) = io.new_dummy_sock("/", ()).await;