Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: io client shared in socketdata #327

Merged
merged 32 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9553267
feat(socketio/extensions): use `RwLock<HashMap>` rather than `DashMap`
Totodore Apr 19, 2024
46c7a1b
chore(bench): add bencher ci
Totodore Apr 19, 2024
d7d6a3f
fix: socketioxide benches with `Bytes`
Totodore Apr 19, 2024
c307a73
chore(bench): fix ci name
Totodore Apr 19, 2024
2894317
chore(bench): add RUSTFLAG for testing
Totodore Apr 19, 2024
44d73ba
fix: engineioxide benches
Totodore Apr 19, 2024
191e3fa
chore(bench): remove matrix test
Totodore Apr 19, 2024
66aeef9
chore(bench): add groups
Totodore Apr 19, 2024
82fefb5
chore(bench): improve extensions bench
Totodore Apr 19, 2024
3652d0a
Merge branch 'bencher' into feat-extensions-rework
Totodore Apr 20, 2024
df530f1
Merge branch 'main' into feat-extensions-rework
Totodore Apr 20, 2024
164a7ae
feat(socketio/extract): refactor extract mod
Totodore Apr 20, 2024
f6008a5
feat(socketio/extract): add `(Maybe)(Http)Extension` extractors
Totodore Apr 20, 2024
ecff81a
docs(example): update examples with `Extension` extractor
Totodore Apr 20, 2024
5070dd0
test(socketio/extract): add tests for `Extension` and `MaybeExtension`
Totodore Apr 20, 2024
a744f7b
docs(example) fmt chat example
Totodore Apr 20, 2024
bf8daab
Merge branch 'main' into feat-extensions-rework
Totodore Apr 20, 2024
f7106db
Merge branch 'main' into feat-extensions-rework
Totodore Apr 21, 2024
50372f2
test(socketio): fix extractors test
Totodore Apr 21, 2024
76c72ca
doc(socketio): improve doc for socketioxide
Totodore Apr 21, 2024
df94f5c
test(socketio): increase timeout
Totodore Apr 21, 2024
5888b37
Merge branch 'main' into feat-extensions-rework
Totodore May 6, 2024
1805c10
Merge branch 'main' into feat-extensions-rework
Totodore May 10, 2024
7b22160
doc(socketio): improve doc
Totodore May 21, 2024
6887304
feat(io): store io client in socketdata so it is possible to retrieve…
Totodore Jun 3, 2024
334e32a
doc(socketio): improve doc
Totodore May 21, 2024
567ca16
Merge branch 'feat-extensions-rework' into feat-io-client-in-socketdata
Totodore Jun 4, 2024
9852110
Merge remote-tracking branch 'origin/main' into feat-io-client-in-soc…
Totodore Jun 4, 2024
d5eb45e
test: use new engineio handler API
Totodore Jun 4, 2024
25ab9d5
Merge branch 'main' into feat-io-client-in-socketdata
Totodore Jun 4, 2024
598c619
test: use new engineio handler API
Totodore Jun 4, 2024
9fa01fb
doc(socketio): add doc on using `SocketIo` as an extractor
Totodore Jun 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions e2e/engineioxide/engineioxide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct MyHandler;
impl EngineIoHandler for MyHandler {
type Data = ();

fn on_connect(&self, socket: Arc<Socket<Self::Data>>) {
fn on_connect(self: Arc<Self>, socket: Arc<Socket<Self::Data>>) {
println!("socket connect {}", socket.id);
}
fn on_disconnect(&self, socket: Arc<Socket<Self::Data>>, reason: DisconnectReason) {
Expand Down Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.max_payload(1e6 as u64)
.build();

let svc = EngineIoService::with_config(MyHandler, config);
let svc = EngineIoService::with_config(Arc::new(MyHandler), config);

let listener = TcpListener::bind("127.0.0.1:3000").await?;

Expand Down
4 changes: 2 additions & 2 deletions engineioxide/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct SocketState {
impl EngineIoHandler for MyHandler {
type Data = SocketState;

fn on_connect(&self, socket: Arc<Socket<SocketState>>) {
fn on_connect(self: Arc<Self>, socket: Arc<Socket<SocketState>>) {
let cnt = self.user_cnt.fetch_add(1, Ordering::Relaxed) + 1;
socket.emit(cnt.to_string()).ok();
}
Expand All @@ -57,7 +57,7 @@ impl EngineIoHandler for MyHandler {
}

// Create a new engineio layer
let layer = EngineIoLayer::new(MyHandler::default());
let layer = EngineIoLayer::new(Arc::new(MyHandler::default()));

let app = axum::Router::<()>::new()
.route("/", get(|| async { "Hello, World!" }))
Expand Down
6 changes: 3 additions & 3 deletions engineioxide/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//!
//! impl EngineIoHandler for MyHandler {
//! type Data = ();
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
Expand All @@ -27,7 +27,7 @@
//! .build();
//!
//! // Create an engine io service with a custom config
//! let svc = EngineIoService::with_config(MyHandler, config);
//! let svc = EngineIoService::with_config(Arc::new(MyHandler), config);
//! ```

use std::{borrow::Cow, time::Duration};
Expand Down Expand Up @@ -143,7 +143,7 @@ impl EngineIoConfigBuilder {
/// impl EngineIoHandler for MyHandler {
///
/// type Data = ();
/// fn on_connect(&self, socket: Arc<Socket<()>>) {
/// fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) {
/// println!("socket connect {}", socket.id);
/// }
/// fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) {
Expand Down
22 changes: 12 additions & 10 deletions engineioxide/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ pub struct EngineIo<H: EngineIoHandler> {
sockets: SocketMap<Socket<H::Data>>,

/// The handler for the engine.io server that will be called when events are received
pub handler: H,
pub handler: Arc<H>,

/// The config for the engine.io server
pub config: EngineIoConfig,
}

impl<H: EngineIoHandler> EngineIo<H> {
/// Create a new Engine.IO server with a [`EngineIoHandler`] and a [`EngineIoConfig`]
pub fn new(handler: H, config: EngineIoConfig) -> Self {
pub fn new(handler: Arc<H>, config: EngineIoConfig) -> Self {
Self {
sockets: RwLock::new(HashMap::new()),
config,
Expand Down Expand Up @@ -64,7 +64,7 @@ impl<H: EngineIoHandler> EngineIo<H> {
.write()
.unwrap()
.insert(socket.id, socket.clone());
self.handler.on_connect(socket.clone());
self.handler.clone().on_connect(socket.clone());
socket
}

Expand Down Expand Up @@ -107,7 +107,7 @@ mod tests {
impl EngineIoHandler for MockHandler {
type Data = ();

fn on_connect(&self, socket: Arc<Socket<Self::Data>>) {
fn on_connect(self: Arc<Self>, socket: Arc<Socket<Self::Data>>) {
println!("socket connect {}", socket.id);
}

Expand All @@ -126,10 +126,14 @@ mod tests {
}
}

fn create_engine() -> Arc<EngineIo<MockHandler>> {
let config = EngineIoConfig::default();
Arc::new(EngineIo::new(Arc::new(MockHandler), config))
}

#[tokio::test]
async fn create_session() {
let config = EngineIoConfig::default();
let engine = Arc::new(EngineIo::new(MockHandler, config));
let engine = create_engine();
let socket = engine.create_session(
ProtocolVersion::V4,
TransportType::Polling,
Expand All @@ -144,8 +148,7 @@ mod tests {

#[tokio::test]
async fn close_session() {
let config = EngineIoConfig::default();
let engine = Arc::new(EngineIo::new(MockHandler, config));
let engine = create_engine();
let socket = engine.create_session(
ProtocolVersion::V4,
TransportType::Polling,
Expand All @@ -160,8 +163,7 @@ mod tests {

#[tokio::test]
async fn get_socket() {
let config = EngineIoConfig::default();
let engine = Arc::new(EngineIo::new(MockHandler, config));
let engine = create_engine();
let socket = engine.create_session(
ProtocolVersion::V4,
TransportType::Polling,
Expand Down
26 changes: 3 additions & 23 deletions engineioxide/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! impl EngineIoHandler for MyHandler {
//! type Data = SocketState;
//!
//! fn on_connect(&self, socket: Arc<Socket<SocketState>>) {
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<SocketState>>) {
//! let cnt = self.user_cnt.fetch_add(1, Ordering::Relaxed) + 1;
//! socket.emit(cnt.to_string()).ok();
//! }
Expand All @@ -37,7 +37,7 @@
//! }
//!
//! // Create an engine io service with the given handler
//! let svc = EngineIoService::new(MyHandler::default());
//! let svc = EngineIoService::new(Arc::new(MyHandler::default()));
//! ```
use std::sync::Arc;

Expand All @@ -54,7 +54,7 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + 'static {
type Data: Default + Send + Sync + 'static;

/// Called when a new socket is connected.
fn on_connect(&self, socket: Arc<Socket<Self::Data>>);
fn on_connect(self: Arc<Self>, socket: Arc<Socket<Self::Data>>);

/// Called when a socket is disconnected with a [`DisconnectReason`]
fn on_disconnect(&self, socket: Arc<Socket<Self::Data>>, reason: DisconnectReason);
Expand All @@ -65,23 +65,3 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + 'static {
/// Called when a binary message is received from the client.
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>);
}

impl<T: EngineIoHandler> EngineIoHandler for Arc<T> {
type Data = T::Data;

fn on_connect(&self, socket: Arc<Socket<Self::Data>>) {
(**self).on_connect(socket)
}

fn on_disconnect(&self, socket: Arc<Socket<Self::Data>>, reason: DisconnectReason) {
(**self).on_disconnect(socket, reason)
}

fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>) {
(**self).on_message(msg, socket)
}

fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>) {
(**self).on_binary(data, socket)
}
}
11 changes: 6 additions & 5 deletions engineioxide/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@
//!
//! impl EngineIoHandler for MyHandler {
//! type Data = ();
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//! // Create a new engineio layer
//! let layer = EngineIoLayer::new(MyHandler);
//! let layer = EngineIoLayer::new(Arc::new(MyHandler));
//!
//! let app = axum::Router::<()>::new()
//! .route("/", get(|| async { "Hello, World!" }))
//! .layer(layer);
//! // Spawn the axum server
//! ```
use std::sync::Arc;
use tower::Layer;

use crate::{config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService};
Expand All @@ -34,13 +35,13 @@ use crate::{config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoS
#[derive(Debug, Clone)]
pub struct EngineIoLayer<H: EngineIoHandler> {
config: EngineIoConfig,
handler: H,
handler: Arc<H>,
}

impl<H: EngineIoHandler> EngineIoLayer<H> {
/// Create a new [`EngineIoLayer`] with a given [`Handler`](crate::handler::EngineIoHandler)
/// and a default [`EngineIoConfig`]
pub fn new(handler: H) -> Self {
pub fn new(handler: Arc<H>) -> Self {
Self {
config: EngineIoConfig::default(),
handler,
Expand All @@ -49,7 +50,7 @@ impl<H: EngineIoHandler> EngineIoLayer<H> {

/// Create a new [`EngineIoLayer`] with a given [`Handler`](crate::handler::EngineIoHandler)
/// and a custom [`EngineIoConfig`]
pub fn from_config(handler: H, config: EngineIoConfig) -> Self {
pub fn from_config(handler: Arc<H>, config: EngineIoConfig) -> Self {
Self { config, handler }
}
}
Expand Down
12 changes: 6 additions & 6 deletions engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
//!
//! impl EngineIoHandler for MyHandler {
//! type Data = ();
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! // Create a new engine.io service that will return a 404 not found response for other requests
//! let service = EngineIoService::new(MyHandler)
//! let service = EngineIoService::new(Arc::new(MyHandler))
//! .into_make_service(); // Create a MakeService from the EngineIoService to give it to hyper
//! ```

Expand Down Expand Up @@ -63,23 +63,23 @@ pub struct EngineIoService<H: EngineIoHandler, S = NotFoundService> {
impl<H: EngineIoHandler> EngineIoService<H, NotFoundService> {
/// Create a new [`EngineIoService`] with a [`NotFoundService`] as the inner service.
/// If the request is not an `EngineIo` request, it will always return a 404 response.
pub fn new(handler: H) -> Self {
pub fn new(handler: Arc<H>) -> Self {
EngineIoService::with_config(handler, EngineIoConfig::default())
}
/// Create a new [`EngineIoService`] with a custom config
pub fn with_config(handler: H, config: EngineIoConfig) -> Self {
pub fn with_config(handler: Arc<H>, config: EngineIoConfig) -> Self {
EngineIoService::with_config_inner(NotFoundService, handler, config)
}
}

impl<S: Clone, H: EngineIoHandler> EngineIoService<H, S> {
/// Create a new [`EngineIoService`] with a custom inner service.
pub fn with_inner(inner: S, handler: H) -> Self {
pub fn with_inner(inner: S, handler: Arc<H>) -> Self {
EngineIoService::with_config_inner(inner, handler, EngineIoConfig::default())
}

/// Create a new [`EngineIoService`] with a custom inner service and a custom config.
pub fn with_config_inner(inner: S, handler: H, config: EngineIoConfig) -> Self {
pub fn with_config_inner(inner: S, handler: Arc<H>, config: EngineIoConfig) -> Self {
EngineIoService {
inner,
engine: Arc::new(EngineIo::new(handler, config)),
Expand Down
4 changes: 2 additions & 2 deletions engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! impl EngineIoHandler for MyHandler {
//! type Data = SocketState;
//!
//! fn on_connect(&self, socket: Arc<Socket<SocketState>>) {
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<SocketState>>) {
//! // Get the request made to initialize the connection
//! // and check that the authorization header is correct
//! let connected = socket.req_parts.headers.get("Authorization")
Expand All @@ -52,7 +52,7 @@
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! let svc = EngineIoService::new(MyHandler::default());
//! let svc = EngineIoService::new(Arc::new(MyHandler::default()));
//! ```
use std::{
sync::{
Expand Down
2 changes: 1 addition & 1 deletion engineioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct MyHandler {
impl EngineIoHandler for MyHandler {
type Data = ();

fn on_connect(&self, socket: Arc<Socket<()>>) {
fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) {
println!("socket connect {}", socket.id);
}
fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) {
Expand Down
3 changes: 2 additions & 1 deletion engineioxide/tests/fixture.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::VecDeque,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};

Expand Down Expand Up @@ -83,7 +84,7 @@ pub async fn create_server<H: EngineIoHandler>(handler: H, port: u16) {

let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);

let svc = EngineIoService::with_config(handler, config);
let svc = EngineIoService::with_config(Arc::new(handler), config);

let listener = TcpListener::bind(&addr).await.unwrap();
tokio::spawn(async move {
Expand Down
Loading