Skip to content

Commit

Permalink
feat: prepare for actix-web support, missing core stream functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
gbaranski committed Mar 5, 2023
1 parent 012066f commit a2b8be8
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 448 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ cfg-if = "1.0.0"

axum_crate = { package = "axum", version = "0.6.1", features = ["ws"], optional = true }
tokio-tungstenite = { version = "0.18.0", optional = true }
actix-web_crate = { package = "actix-web", version = "4.3.1", optional = true }
actix-http = { version = "3.3.1", optional = true }

[features]
default = ["client", "server"]
Expand All @@ -33,6 +35,7 @@ client = ["tokio-tungstenite"]
server = []
tungstenite = ["server", "tokio-tungstenite"]
axum = ["server", "axum_crate"]
actix-web = ["actix-web_crate", "actix-http"]

[dev-dependencies]
tokio = { version = "1.17.0", features = ["full"] }
Expand All @@ -46,7 +49,7 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[workspace]
members = ["examples/chat-client", "examples/chat-server", "examples/chat-server-axum", "examples/echo-server", "examples/simple-client", "examples/counter-server"]
members = ["examples/chat-client", "examples/chat-server", "examples/chat-server-axum", "examples/chat-server-actix-web", "examples/echo-server", "examples/simple-client", "examples/counter-server"]

[[test]]
name = "axum"
Expand Down
15 changes: 15 additions & 0 deletions examples/chat-server-actix-web/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "ezsockets-chat-actix-web"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix-web = "4.3.1"
async-trait = "0.1.52"
axum = "0.6.1"
ezsockets = { path = "../../", features = ["actix-web"] }
tokio = { version = "1.17.0", features = ["full"] }
tracing = "0.1.32"
tracing-subscriber = "0.3.9"
145 changes: 145 additions & 0 deletions examples/chat-server-actix-web/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use async_trait::async_trait;
use axum::extract::Extension;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use ezsockets::axum::Upgrade;
use ezsockets::Error;
use ezsockets::Server;
use ezsockets::Socket;
use std::collections::HashMap;
use std::io::BufRead;
use std::net::SocketAddr;

type SessionID = u16;
type Session = ezsockets::Session<SessionID, ()>;

#[derive(Debug)]
enum ChatMessage {
Send { from: SessionID, text: String },
}

struct ChatServer {
sessions: HashMap<SessionID, Session>,
handle: Server<Self>,
}

#[async_trait]
impl ezsockets::ServerExt for ChatServer {
type Session = ChatSession;
type Params = ChatMessage;

async fn accept(
&mut self,
socket: Socket,
_address: SocketAddr,
_args: <Self::Session as ezsockets::SessionExt>::Args,
) -> Result<Session, Error> {
let id = (0..).find(|i| !self.sessions.contains_key(i)).unwrap_or(0);
let session = Session::create(
|_| ChatSession {
id,
server: self.handle.clone(),
},
id,
socket,
);
self.sessions.insert(id, session.clone());
Ok(session)
}

async fn disconnected(
&mut self,
id: <Self::Session as ezsockets::SessionExt>::ID,
) -> Result<(), Error> {
assert!(self.sessions.remove(&id).is_some());
Ok(())
}

async fn call(&mut self, params: Self::Params) -> Result<(), Error> {
match params {
ChatMessage::Send { text, from } => {
let sessions = self.sessions.iter().filter(|(id, _)| from != **id);
let text = format!("from {from}: {text}");
for (id, handle) in sessions {
tracing::info!("sending {text} to {id}");
handle.text(text.clone());
}
}
};
Ok(())
}
}

struct ChatSession {
id: SessionID,
server: Server<ChatServer>,
}

#[async_trait]
impl ezsockets::SessionExt for ChatSession {
type ID = SessionID;
type Args = ();
type Params = ();

fn id(&self) -> &Self::ID {
&self.id
}
async fn text(&mut self, text: String) -> Result<(), Error> {
tracing::info!("received: {text}");
self.server.call(ChatMessage::Send {
from: self.id,
text,
});
Ok(())
}

async fn binary(&mut self, _bytes: Vec<u8>) -> Result<(), Error> {
unimplemented!()
}

async fn call(&mut self, params: Self::Params) -> Result<(), Error> {
let () = params;
Ok(())
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let (server, _) = Server::create(|handle| ChatServer {
sessions: HashMap::new(),
handle,
});

let app = Router::new()
.route("/websocket", get(websocket_handler))
.layer(Extension(server.clone()));

let address = SocketAddr::from(([127, 0, 0, 1], 8080));

tokio::spawn(async move {
tracing::debug!("listening on {}", address);
axum::Server::bind(&address)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await
.unwrap();
});

let stdin = std::io::stdin();
let lines = stdin.lock().lines();
for line in lines {
let line = line.unwrap();
server.call(ChatMessage::Send {
text: line,
from: SessionID::MAX, // reserve some ID for the server
});
}
}

async fn websocket_handler(
Extension(server): Extension<Server<ChatServer>>,
ezsocket: Upgrade,
) -> impl IntoResponse {
ezsocket.on_upgrade(server, ())
}
Loading

0 comments on commit a2b8be8

Please sign in to comment.