From 70a9dabdfb8fd12e6bdeb8e07d33df0f8da9c02f Mon Sep 17 00:00:00 2001 From: 29 <791603901@qq.com> Date: Sat, 11 Jan 2025 15:25:22 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9A=97=EF=B8=8F=20=E5=AE=9E=E7=8E=B0`async-r?= =?UTF-8?q?eq-res`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tracing-surreal/src/async_req_res.rs | 89 ++++++++++++++++++++++++++++ tracing-surreal/src/lib.rs | 1 + 2 files changed, 90 insertions(+) create mode 100644 tracing-surreal/src/async_req_res.rs diff --git a/tracing-surreal/src/async_req_res.rs b/tracing-surreal/src/async_req_res.rs new file mode 100644 index 0000000..ee1a751 --- /dev/null +++ b/tracing-surreal/src/async_req_res.rs @@ -0,0 +1,89 @@ +use std::ops::{Deref, DerefMut}; +use thiserror::Error; +use tokio::sync::{ + mpsc::{error::SendError, unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, +}; + +#[derive(Debug)] +pub struct Request { + req: Q, + res_send: oneshot::Sender, +} + +impl Deref for Request { + type Target = Q; + + fn deref(&self) -> &Self::Target { + &self.req + } +} + +impl DerefMut for Request { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.req + } +} + +impl Request { + pub fn req(&self) -> Q + where + Q: Copy, + { + self.req + } + + pub fn req_cloned(&self) -> Q + where + Q: Clone, + { + self.req.clone() + } + + pub fn response(self, res: S) -> Result<(), S> { + self.res_send.send(res) + } +} + +#[derive(Error, Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum RequestError { + #[error("send request error: Respondor closed or dropped")] + SendReqErr, + #[error("recv response error: Request handle dropped without sending response")] + RecvResErr, +} + +impl From> for RequestError { + fn from(_value: SendError) -> Self { + Self::SendReqErr + } +} + +#[derive(Debug, Clone)] +pub struct Requester(UnboundedSender>); + +impl Requester { + pub async fn request(&self, req: Q) -> Result { + let (res_send, res_recv) = oneshot::channel(); + self.0.send(Request { req, res_send })?; + res_recv.await.map_err(|_| RequestError::RecvResErr) + } +} + +#[derive(Debug)] +pub struct Respondor(UnboundedReceiver>); + +impl Respondor { + pub async fn next_requset(&mut self) -> Option> { + self.0.recv().await + } + + pub fn close(&mut self) { + self.0.close(); + } +} + +pub fn req_res() -> (Requester, Respondor) { + let (req_send, req_recv) = unbounded_channel(); + (Requester(req_send), Respondor(req_recv)) +} diff --git a/tracing-surreal/src/lib.rs b/tracing-surreal/src/lib.rs index 79696ca..bf372de 100644 --- a/tracing-surreal/src/lib.rs +++ b/tracing-surreal/src/lib.rs @@ -1,5 +1,6 @@ #![cfg_attr(nightly, feature(doc_auto_cfg))] +pub mod async_req_res; pub mod stop; pub mod tmp; pub mod tracing_msg;