Skip to content

Commit

Permalink
⚗️ Implement Graceful Shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
czy-29 committed Dec 11, 2024
1 parent 96f64e9 commit 2b35bcb
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 5 deletions.
18 changes: 16 additions & 2 deletions tracing-surreal/src/bin/host.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use est::AnyRes;
use std::time::Duration;
use surrealdb::{
engine::remote::ws::{Client, Ws},
opt::auth::Root,
Surreal,
};
use tokio::time::sleep;
use tracing_surreal::{stop::Stop, tmp::server::ServerBuilder};

async fn db() -> AnyRes<Surreal<Client>> {
Expand All @@ -21,8 +23,20 @@ async fn db() -> AnyRes<Surreal<Client>> {
#[tokio::main]
async fn main() -> AnyRes {
let stop = Stop::init(db().await?, "test").await?;
let server = ServerBuilder::from_stop_default(&stop).start().await?;
let mut server = ServerBuilder::from_stop_default(&stop).start().await?;
println!("{}", server.get_local_addr());
server.join().await??;

let grace_type = tokio::select! {
_ = sleep(Duration::from_secs_f64(5.0)) => {
println!("5s elapsed, initiating shutdown...");
server.graceful_shutdown().await??
}
res = server.get_routine_mut() => {
println!("routine exited");
res??
}
};

println!("{:?}", grace_type);
Ok(())
}
54 changes: 51 additions & 3 deletions tracing-surreal/src/tmp/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::stop::Stop;
use std::{
future::IntoFuture,
io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
time::Duration,
Expand All @@ -8,6 +9,8 @@ use surrealdb::Connection;
use thiserror::Error;
use tokio::{
net::{lookup_host, TcpListener, ToSocketAddrs},
signal::ctrl_c,
sync::oneshot,
task::{JoinError, JoinHandle},
};

Expand All @@ -31,6 +34,7 @@ pub struct ServerBuilder<C: Connection> {
recv_bincode: Option<bool>,
fuck_off_on_damage: bool,
send_format: SendFormat,
ctrlc_shutdown: bool,
handshake_timeout: Duration,
bind_addrs: Vec<SocketAddr>,
}
Expand All @@ -57,6 +61,7 @@ impl<C: Connection + Clone> ServerBuilder<C> {
recv_bincode: Some(true),
fuck_off_on_damage: false,
send_format: SendFormat::Bincode,
ctrlc_shutdown: true,
handshake_timeout: Duration::from_secs_f64(3.0),
bind_addrs: vec![SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8192).into()],
}
Expand Down Expand Up @@ -156,6 +161,13 @@ impl<C: Connection + Clone> ServerBuilder<C> {
}
}

pub fn disable_ctrlc_shutdown(self) -> Self {
Self {
ctrlc_shutdown: false,
..self
}
}

pub fn handshake_timeout(self, timeout: Duration) -> Self {
Self {
handshake_timeout: timeout,
Expand All @@ -178,6 +190,7 @@ impl<C: Connection + Clone> ServerBuilder<C> {
let listener = TcpListener::bind(self.bind_addrs.as_slice()).await?;
let builder = self;
let local_addr = listener.local_addr().unwrap();
let (shutdown_s, mut shutdown_r) = oneshot::channel();
let routine = tokio::spawn(async move {
builder.stop.print().await;
println!("{}", builder.pusher_path);
Expand All @@ -188,14 +201,28 @@ impl<C: Connection + Clone> ServerBuilder<C> {
println!("{:?}", builder.handshake_timeout);

loop {
let (stream, client) = listener.accept().await?;
let (stream, client) = tokio::select! {
res = ctrl_c(), if builder.ctrlc_shutdown => {
println!("Bye from ctrl_c");
return res.map(|_| GracefulType::CtrlC);
}
_ = &mut shutdown_r => {
println!("Bye from shutdown_r");
return Ok(GracefulType::Explicit);
}
res = listener.accept() => {
res?
}
};

println!("{:?}", stream);
println!("{}", client);
}
});

Ok(ServerHandle {
local_addr,
shutdown_s,
routine,
})
}
Expand All @@ -204,15 +231,36 @@ impl<C: Connection + Clone> ServerBuilder<C> {
#[derive(Debug)]
pub struct ServerHandle {
local_addr: SocketAddr,
routine: JoinHandle<io::Result<()>>,
shutdown_s: oneshot::Sender<()>,
routine: JoinHandle<io::Result<GracefulType>>,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum GracefulType {
CtrlC,
Explicit,
}

impl ServerHandle {
pub fn get_local_addr(&self) -> SocketAddr {
self.local_addr
}

pub async fn join(self) -> Result<io::Result<()>, JoinError> {
pub fn get_routine_mut(&mut self) -> &mut JoinHandle<io::Result<GracefulType>> {
&mut self.routine
}

pub async fn graceful_shutdown(self) -> Result<io::Result<GracefulType>, JoinError> {
self.shutdown_s.send(()).ok();
self.routine.await
}
}

impl IntoFuture for ServerHandle {
type Output = Result<io::Result<GracefulType>, JoinError>;
type IntoFuture = JoinHandle<io::Result<GracefulType>>;

fn into_future(self) -> Self::IntoFuture {
self.routine
}
}

0 comments on commit 2b35bcb

Please sign in to comment.