Skip to content

Commit

Permalink
⚗️ 更新实验代码
Browse files Browse the repository at this point in the history
  • Loading branch information
czy-29 committed Jan 4, 2025
1 parent cf40f15 commit c901dc6
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 20 deletions.
8 changes: 4 additions & 4 deletions tracing-surreal/src/tmp/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
stop::Stop,
tracing_msg::{ClientRole, GracefulType},
tracing_msg::{ClientRole, GraceType},
};
use est::task::CloseAndWait;
use std::{
Expand Down Expand Up @@ -224,12 +224,12 @@ impl<C: Connection + Clone> ServerBuilder<C> {
println!("Bye from ctrl_c");
shutdown_waiter.cancel();
tracker.close_and_wait().await;
return res.map(|_| GracefulType::CtrlC);
return res.map(|_| GraceType::CtrlC);
}
_ = shutdown_waiter.cancelled() => {
println!("Bye from shutdown_waiter");
tracker.close_and_wait().await;
return Ok(GracefulType::Explicit);
return Ok(GraceType::Explicit);
}
res = listener.accept() => {
if let Err(err) = &res {
Expand Down Expand Up @@ -409,7 +409,7 @@ fn err_resp(text: &str, status: StatusCode) -> ErrorResponse {
resp
}

type RoutineOutput = Result<GracefulType, io::Error>;
type RoutineOutput = Result<GraceType, io::Error>;
type HandleOutput = Result<RoutineOutput, JoinError>;

#[derive(Debug)]
Expand Down
70 changes: 63 additions & 7 deletions tracing-surreal/src/tracing_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use derive_more::Display;
use est::{task::TaskId, thread::ThreadId};
use indexmap::{map::Entry, IndexMap};
use serde::{Deserialize, Serialize};
use std::{future::Future, num::NonZeroU64, ops::Deref, thread};
use std::{num::NonZeroU64, ops::Deref, thread};
use tokio::task;

pub(crate) mod handshake;
Expand Down Expand Up @@ -415,20 +415,76 @@ impl From<MsgBody> for TracingMsg {
}
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum GracefulType {
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum GraceType {
CtrlC,
Explicit,
}

pub trait CloseTransport: Send + 'static {
fn close_transport(&self) -> impl Future<Output = ()> + Send {
async {}
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash)]
#[non_exhaustive]
pub enum CloseOk {
Grace(GraceType),
Other,
}

impl From<GraceType> for CloseOk {
fn from(value: GraceType) -> Self {
Self::Grace(value)
}
}

#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash)]
#[non_exhaustive]
pub enum CloseErrKind {
Io,
LayerDropped,
PushMsgErr,
Other,
}

#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
pub struct CloseErr {
pub kind: CloseErrKind,
pub display: String,
}

impl CloseErr {
pub fn new(kind: CloseErrKind, display: impl std::fmt::Display) -> Self {
Self {
kind,
display: display.to_string(),
}
}

pub fn other(err: impl std::error::Error) -> Self {
Self {
kind: CloseErrKind::Other,
display: err.to_string(),
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
pub struct CloseMsg(pub Result<CloseOk, CloseErr>);

impl CloseMsg {
pub fn ok(value: CloseOk) -> Self {
Self(Ok(value))
}

pub fn err(err: CloseErr) -> Self {
Self(Err(err))
}
}

#[trait_variant::make(Send)]
pub trait CloseTransport: 'static {
async fn close_transport(&mut self, msg: Option<CloseMsg>);
}

#[trait_variant::make(Send)]
pub trait PushMsg: 'static {
type Error: std::error::Error + Send + 'static;
async fn push_msg(&self, msg: TracingMsg) -> Result<(), Self::Error>;
async fn push_msg(&mut self, msg: TracingMsg) -> Result<(), Self::Error>;
}
87 changes: 78 additions & 9 deletions tracing-surreal/src/tracing_msg/layer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::{CloseTransport, GracefulType, MsgBody, PushMsg, TracingMsg};
use super::{
CloseErr, CloseErrKind, CloseMsg, CloseTransport, GraceType, MsgBody, PushMsg, TracingMsg,
};
use std::{
fmt::Debug,
future::Future,
Expand All @@ -8,7 +10,7 @@ use std::{
use thiserror::Error;
use tokio::{
signal::ctrl_c,
sync::mpsc::{unbounded_channel, UnboundedSender},
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
task::{JoinError, JoinHandle},
};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -89,7 +91,7 @@ impl<S: tracing_core::Subscriber> tracing_subscriber::Layer<S> for MsgLayer {
}

#[derive(Error, Debug)]
pub enum RoutineError<T: PushMsg> {
pub enum LayerError<T: PushMsg> {
#[error("io error: `{0}`")]
Io(#[from] std::io::Error),
#[error("MsgLayer dropped")]
Expand All @@ -98,9 +100,30 @@ pub enum RoutineError<T: PushMsg> {
PushMsgErr(T::Error),
}

type RoutineOutput<T> = Result<GracefulType, RoutineError<T>>;
impl<T: PushMsg> From<&LayerError<T>> for CloseErr {
fn from(err: &LayerError<T>) -> Self {
let kind = match err {
LayerError::Io(_) => CloseErrKind::Io,
LayerError::LayerDropped => CloseErrKind::LayerDropped,
LayerError::PushMsgErr(_) => CloseErrKind::PushMsgErr,
};

Self::new(kind, err)
}
}

type RoutineOutput<T> = Result<GraceType, LayerError<T>>;
type HandleOutput<T> = Result<RoutineOutput<T>, JoinError>;

impl<T: PushMsg> From<&RoutineOutput<T>> for CloseMsg {
fn from(res: &RoutineOutput<T>) -> Self {
match res {
Ok(ok) => Self::ok((*ok).into()),
Err(err) => Self::err(err.into()),
}
}
}

#[derive(Debug)]
pub struct MsgRoutine<T: PushMsg> {
shutdown_trigger: CancellationToken,
Expand Down Expand Up @@ -149,21 +172,67 @@ impl<T: CloseTransport + PushMsg + Clone + Debug> MsgLayerBuiler<T> {
}
}

pub fn continue_on_error(self) -> Self {
pub fn discard_push_error(self) -> Self {
Self {
abort_on_error: false,
..self
}
}

async fn close(&mut self, output: RoutineOutput<T>) -> RoutineOutput<T> {
if self.close_on_shutdown {
self.transport.close_transport(Some((&output).into())).await;
}

output
}

async fn push_msg(&mut self, msg: TracingMsg) -> Result<(), LayerError<T>> {
if let Err(err) = self.transport.push_msg(msg).await {
if self.abort_on_error {
self.close(Err(LayerError::PushMsgErr(err))).await?;
}
}

Ok(())
}

async fn flush_close(
mut self,
mut recv: UnboundedReceiver<TracingMsg>,
output: RoutineOutput<T>,
) -> RoutineOutput<T> {
while let Ok(msg) = recv.try_recv() {
self.push_msg(msg).await?;
}

self.close(output).await
}

pub fn build(self) -> (MsgLayer, MsgRoutine<T>) {
let builder = self;
let (send, recv) = unbounded_channel();
let mut builder = self;
let (send, mut recv) = unbounded_channel();
let shutdown_trigger = CancellationToken::new();
let shutdown_waiter = shutdown_trigger.clone();
let routine = tokio::spawn(async move {
// todo
Ok(GracefulType::Explicit)
loop {
tokio::select! {
res = ctrl_c(), if builder.ctrlc_shutdown => {
return builder
.flush_close(recv, res.map(|_| GraceType::CtrlC).map_err(From::from))
.await;
}
_ = shutdown_waiter.cancelled() => {
return builder.flush_close(recv, Ok(GraceType::Explicit)).await;
}
msg = recv.recv() => match msg {
None => {
return builder.close(Err(LayerError::LayerDropped)).await;
}
Some(msg) => builder.push_msg(msg).await?,
}
};
}
});
let msg_layer = MsgLayer(send);
let msg_routine = MsgRoutine {
Expand Down

0 comments on commit c901dc6

Please sign in to comment.