Skip to content

Commit

Permalink
⚗️ Update experimental code
Browse files Browse the repository at this point in the history
  • Loading branch information
czy-29 committed Dec 13, 2024
1 parent 4e0e3c1 commit 29a5493
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 29 deletions.
12 changes: 12 additions & 0 deletions tracing-surreal/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tracing-surreal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ futures = "0.3.31"
rmp-serde = "1.3.0"
serde = { version = "1.0.216", features = ["derive"] }
serde_json = "1.0.133"
serde_qs = "0.13.0"
surrealdb = "2.1.3"
thiserror = "2.0.6"
tokio = { version = "1.42.0", features = ["full"] }
Expand Down
172 changes: 143 additions & 29 deletions tracing-surreal/src/tmp/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::stop::Stop;
use est::task::CloseAndWait;
use std::{
collections::HashMap,
future::Future,
io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
Expand All @@ -13,12 +14,26 @@ use thiserror::Error;
use tokio::{
net::{lookup_host, TcpListener, ToSocketAddrs},
signal::ctrl_c,
sync::oneshot,
task::{JoinError, JoinHandle},
time::timeout,
};
use tokio_tungstenite::{accept_hdr_async, tungstenite::handshake::server::Request};
use tokio_tungstenite::{
accept_hdr_async,
tungstenite::handshake::server::{ErrorResponse, Request, Response},
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};

#[derive(Debug, Clone)]
struct AuthArgs {
pusher_path: String,
pusher_token: Option<String>,
observer_path: String,
observer_token: Option<String>,
director_path: String,
director_token: Option<String>,
}

#[derive(Debug, Copy, Clone)]
enum SendFormat {
Json,
Expand All @@ -29,12 +44,7 @@ enum SendFormat {
#[derive(Clone, Debug)]
pub struct ServerBuilder<C: Connection> {
stop: Stop<C>,
pusher_path: String,
pusher_token: Option<String>,
observer_path: String,
observer_token: Option<String>,
director_path: String,
director_token: Option<String>,
auth_args: AuthArgs,
recv_json: bool,
recv_bincode: Option<bool>,
fuck_off_on_damage: bool,
Expand Down Expand Up @@ -63,12 +73,14 @@ impl<C: Connection + Clone> ServerBuilder<C> {
pub fn from_stop_default(stop: &Stop<C>) -> Self {
Self {
stop: stop.clone(),
pusher_path: "/pusher".into(),
pusher_token: None,
observer_path: "/observer".into(),
observer_token: None,
director_path: "/director".into(),
director_token: None,
auth_args: AuthArgs {
pusher_path: "/pusher".into(),
pusher_token: None,
observer_path: "/observer".into(),
observer_token: None,
director_path: "/director".into(),
director_token: None,
},
recv_json: true,
recv_bincode: Some(true),
fuck_off_on_damage: false,
Expand All @@ -82,52 +94,73 @@ impl<C: Connection + Clone> ServerBuilder<C> {

pub fn pusher_path(self, path: &str) -> Self {
Self {
pusher_path: path.into(),
auth_args: AuthArgs {
pusher_path: path.into(),
..self.auth_args
},
..self
}
}

pub fn pusher_token(self, token: &str) -> Self {
Self {
pusher_token: Some(token.into()),
auth_args: AuthArgs {
pusher_token: Some(token.into()),
..self.auth_args
},
..self
}
}

pub fn observer_path(self, path: &str) -> Self {
Self {
observer_path: path.into(),
auth_args: AuthArgs {
observer_path: path.into(),
..self.auth_args
},
..self
}
}

pub fn observer_token(self, token: &str) -> Self {
Self {
observer_token: Some(token.into()),
auth_args: AuthArgs {
observer_token: Some(token.into()),
..self.auth_args
},
..self
}
}

pub fn director_path(self, path: &str) -> Self {
Self {
director_path: path.into(),
auth_args: AuthArgs {
director_path: path.into(),
..self.auth_args
},
..self
}
}

pub fn director_token(self, token: &str) -> Self {
Self {
director_token: Some(token.into()),
auth_args: AuthArgs {
director_token: Some(token.into()),
..self.auth_args
},
..self
}
}

pub fn default_token(self, token: &str) -> Self {
let optb = Some(token.into());
Self {
pusher_token: self.pusher_token.or(optb.clone()),
observer_token: self.observer_token.or(optb.clone()),
director_token: self.director_token.or(optb),
auth_args: AuthArgs {
pusher_token: self.auth_args.pusher_token.or(optb.clone()),
observer_token: self.auth_args.observer_token.or(optb.clone()),
director_token: self.auth_args.director_token.or(optb),
..self.auth_args
},
..self
}
}
Expand Down Expand Up @@ -214,12 +247,10 @@ impl<C: Connection + Clone> ServerBuilder<C> {
let shutdown_waiter = shutdown_trigger.clone();
let routine = tokio::spawn(async move {
builder.stop.print().await;
println!("{}", builder.pusher_path);
println!("{}", builder.observer_path);
println!("{}", builder.director_path);
println!("{}", builder.fuck_off_on_damage);
println!("{:?}", builder.send_format);
println!("{:?}", builder.tmp_handshake_timeout);
// log safe builder info into db

let tracker = TaskTracker::new();

Expand Down Expand Up @@ -249,17 +280,42 @@ impl<C: Connection + Clone> ServerBuilder<C> {
println!("{}", client);

let builder = builder.clone();
let auth_args = builder.auth_args.clone();
let shutdown_waiter = shutdown_waiter.clone();
let (send, recv) = oneshot::channel();
tracker.spawn(async move {
let stream = tokio::select! {
let (stream, role) = tokio::select! {
_ = shutdown_waiter.cancelled() => {
println!("shutdown_waiter.cancelled()");
return;
}
res = timeout(
builder.ws_handshake_timeout,
accept_hdr_async(stream, move |_req: &Request, res| {
Ok(res)
accept_hdr_async(stream, move |req: &Request, resp| {
let uri = req.uri();
let path = uri.path();
let query: Option<
Result<HashMap<String, String>, serde_qs::Error>,
> = uri.query().map(serde_qs::from_str);

println!("path: {}", path);
println!("query: {:?}", query);

if path == auth_args.pusher_path {
return token_auth(query, auth_args.pusher_token, send, Role::Pusher, resp);
}

if path == auth_args.observer_path {
return token_auth(query, auth_args.observer_token, send, Role::Observer, resp);
}

if path == auth_args.director_path {
return token_auth(query, auth_args.director_token, send, Role::Director, resp);
}

let text = "invalid path!";
println!("{}", text);
Err(err_resp(text))
}),
) => match res {
Err(err) => {
Expand All @@ -270,11 +326,12 @@ impl<C: Connection + Clone> ServerBuilder<C> {
println!("inner_err: {}", err);
return;
}
Ok(Ok(stream)) => stream,
Ok(Ok(stream)) => (stream, recv.await.unwrap()),
}
};

println!("inner_stream: {:?}", stream);
println!("inner_role: {:?}", role);
});
}
});
Expand All @@ -287,6 +344,63 @@ impl<C: Connection + Clone> ServerBuilder<C> {
}
}

#[derive(Debug, Clone, Copy)]
enum Role {
Pusher,
Observer,
Director,
}

fn token_auth(
query: Option<Result<HashMap<String, String>, serde_qs::Error>>,
token_need: Option<String>,
send: oneshot::Sender<Role>,
role: Role,
resp: Response,
) -> Result<Response, ErrorResponse> {
if let Some(token_need) = token_need {
match query {
None => {
let text = "need query!";
println!("{}", text);
return Err(err_resp(text));
}
Some(Err(err)) => {
let text = format!("query err: {}", err);
println!("{}", text);
return Err(err_resp(&text));
}
Some(Ok(map)) => {
println!("{:?}", map);

match map.get("token") {
None => {
let text = "need token!";
println!("{}", text);
return Err(err_resp(text));
}
Some(token_req) => {
println!("token_req: {}", token_req);

if *token_req != token_need {
let text = "wrong token!";
println!("{}", text);
return Err(err_resp(text));
}
}
}
}
}
}

send.send(role).ok();
return Ok(resp);
}

fn err_resp(text: &str) -> ErrorResponse {
ErrorResponse::new(Some(text.into()))
}

type RoutineOutput = io::Result<GracefulType>;
type ServerOutput = Result<RoutineOutput, JoinError>;

Expand Down

0 comments on commit 29a5493

Please sign in to comment.