Skip to content

Commit

Permalink
⚗️ 继续消灭一些todo
Browse files Browse the repository at this point in the history
  • Loading branch information
czy-29 committed Jan 4, 2025
1 parent 6a6099a commit 3a3ef2b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 7 deletions.
6 changes: 3 additions & 3 deletions tracing-surreal/src/tmp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ fn err_resp(text: &str, status: StatusCode) -> ErrorResponse {
}

type RoutineOutput = Result<GracefulType, io::Error>;
type ServerOutput = Result<RoutineOutput, JoinError>;
type HandleOutput = Result<RoutineOutput, JoinError>;

#[derive(Debug)]
pub struct ServerHandle {
Expand All @@ -428,14 +428,14 @@ impl ServerHandle {
self.shutdown_trigger.cancel();
}

pub async fn graceful_shutdown(self) -> ServerOutput {
pub async fn graceful_shutdown(self) -> HandleOutput {
self.trigger_graceful_shutdown();
self.await
}
}

impl Future for ServerHandle {
type Output = ServerOutput;
type Output = HandleOutput;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.routine).poll(cx)
Expand Down
6 changes: 3 additions & 3 deletions tracing-surreal/src/tracing_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,13 +422,13 @@ pub enum GracefulType {
}

pub trait CloseTransport: Send {
fn close_transport(&mut self) -> impl Future<Output = ()> + Send {
fn close_transport(&self) -> impl Future<Output = ()> + Send {
async {}
}
}

#[trait_variant::make(Send)]
pub trait PushMsg {
type Error;
async fn push_msg(&mut self, msg: TracingMsg) -> Result<(), Self::Error>;
type Error: std::error::Error + Send + 'static;
async fn push_msg(&self, msg: TracingMsg) -> Result<(), Self::Error>;
}
46 changes: 45 additions & 1 deletion tracing-surreal/src/tracing_msg/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,33 @@ impl<S: tracing_core::Subscriber> tracing_subscriber::Layer<S> for MsgLayer {
}
}

pub struct MsgRoutine;
type RoutineOutput<T> = Result<GracefulType, <T as PushMsg>::Error>;
type HandleOutput<T> = Result<RoutineOutput<T>, JoinError>;

#[derive(Debug)]
pub struct MsgRoutine<T: PushMsg> {
shutdown_trigger: CancellationToken,
routine: JoinHandle<RoutineOutput<T>>,
}

impl<T: PushMsg> MsgRoutine<T> {
pub fn trigger_graceful_shutdown(&self) {
self.shutdown_trigger.cancel();
}

pub async fn graceful_shutdown(self) -> HandleOutput<T> {
self.trigger_graceful_shutdown();
self.await
}
}

impl<T: PushMsg> Future for MsgRoutine<T> {
type Output = HandleOutput<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.routine).poll(cx)
}
}

#[derive(Clone, Debug)]
pub struct MsgLayerBuiler<T: CloseTransport + PushMsg + Clone + Debug> {
Expand Down Expand Up @@ -118,6 +144,24 @@ impl<T: CloseTransport + PushMsg + Clone + Debug> MsgLayerBuiler<T> {
..self
}
}

pub fn build(self) -> (MsgLayer, MsgRoutine<T>) {
let builder = self;
let (send, recv) = unbounded_channel();
let shutdown_trigger = CancellationToken::new();
let shutdown_waiter = shutdown_trigger.clone();
let routine = tokio::spawn(async move {
// todo
Ok(GracefulType::Explicit)
});
let msg_layer = MsgLayer(send);
let msg_routine = MsgRoutine {
shutdown_trigger,
routine,
};

(msg_layer, msg_routine)
}
}

pub trait TracingLayerDefault {
Expand Down

0 comments on commit 3a3ef2b

Please sign in to comment.