From c0191af4d7b8507400bec6fc3bc6155997be07f1 Mon Sep 17 00:00:00 2001 From: Felix Obenhuber Date: Tue, 14 Feb 2023 17:40:38 +0100 Subject: [PATCH] Implement Stream for mpsc::Receiver. Add PollSender. Implement `futures::future::Stream` for `mpsc::Receiver`. Add `mpsc::PollSender` that wrapps a mpsc::Sender and implements `Sink`. --- Cargo.toml | 2 + src/mpsc/async_impl.rs | 172 +++++++++++++++++++++++++++++++++++++++++ src/mpsc/errors.rs | 5 +- 3 files changed, 177 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7baf650..45dc131 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,10 +25,12 @@ std = ["alloc", "parking_lot"] alloc = [] default = ["std"] static = [] +stream = ["futures"] [dependencies] pin-project = "1" parking_lot = { version = "0.12", optional = true, default-features = false } +futures = { version = "0.3", default_features = false, optional = true } [dev-dependencies] tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "macros", "sync"] } diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index 9f3431b..a9716ee 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -281,6 +281,165 @@ feature! { } } + feature! { + #![feature = "stream"] + use futures::sink::Sink; + use std::mem; + + /// PollSender internal state. + enum PollSenderState<'a, T, R> { + Idle, + Acquiring(Pin>>), + ReadyToSend(SendRef<'a, T>), + Closed, + } + + /// A wrapper around mpsc::Sender that can be polled. + pub struct PollSender<'a, T, R> { + sender: Option>, + state: PollSenderState<'a, T, R>, + } + + impl<'a, T, R> PollSender<'a, T, R> where R: 'a + Recycle, T: 'a { + /// Creates a new `PollSender`. + pub fn new(sender: Sender) -> PollSender<'a, T, R> { + PollSender { + sender: Some(sender), + state: PollSenderState::Idle, + } + } + + pub fn poll_reserve(&'a mut self, cx: &mut Context<'_>) -> Poll> { + loop { + let (result, next_state) = match mem::replace(&mut self.state, PollSenderState::Closed) { + PollSenderState::Idle => { + match self.sender.as_ref() { + Some(sender) => { + let send_ref = Box::pin(SendRefFuture { + core: &sender.inner.core, + slots: sender.inner.slots.as_ref(), + recycle: &sender.inner.recycle, + state: State::Start, + waiter: queue::Waiter::new(), + }); + // Start trying to acquire a permit to reserve a slot for our send, and + // immediately loop back around to poll it the first time. + (None, PollSenderState::Acquiring(send_ref)) + } + None => (Some(Poll::Ready(Err(Closed(())))), PollSenderState::Closed) + } + } + PollSenderState::Acquiring(mut f) => match f.as_mut().poll(cx) { + // Channel has capacity. + Poll::Ready(Ok(send_ref)) => { + (Some(Poll::Ready(Ok(()))), PollSenderState::ReadyToSend(send_ref)) + } + // Channel is closed. + Poll::Ready(Err(e)) => (Some(Poll::Ready(Err(e))), PollSenderState::Closed), + // Channel doesn't have capacity yet, so we need to wait. + Poll::Pending => (Some(Poll::Pending), PollSenderState::Acquiring(f)), + }, + // We're closed, either by choice or because the underlying sender was closed. + s @ PollSenderState::Closed => (Some(Poll::Ready(Err(Closed(())))), s), + // We're already ready to send an item. + s @ PollSenderState::ReadyToSend(_) => (Some(Poll::Ready(Ok(()))), s), + }; + + self.state = next_state; + if let Some(result) = result { + return result; + } + } + } + + /// Sends an item to the channel. + /// + /// Before calling `send_item`, `poll_reserve` must be called with a successful return + /// value of `Poll::Ready(Ok(()))`. + /// + /// # Errors + /// + /// If the channel is closed, an error will be returned. This is a permanent state. + /// + /// # Panics + /// + /// If `poll_reserve` was not successfully called prior to calling `send_item`, then this method + /// will panic. + pub fn send_item(&mut self, value: T) -> Result<(), Closed> { + let (result, next_state) = match mem::replace(&mut self.state, PollSenderState::Closed) { + PollSenderState::Idle | PollSenderState::Acquiring(_) => { + panic!("`send_item` called without first calling `poll_reserve`") + } + // We have a permit to send our item, so go ahead, which gets us our sender back. + PollSenderState::ReadyToSend(mut send_ref) => { + *send_ref = value; + match &self.sender { + Some(_) => (Ok(()), PollSenderState::::Idle), + None => (Ok(()), PollSenderState::Closed), // Closed in between. + } + }, + // We're closed, either by choice or because the underlying sender was closed. + PollSenderState::Closed => (Err(Closed(value)), PollSenderState::Closed), + }; + + // Handle deferred closing if `close` was called between `poll_reserve` and `send_item`. + self.state = if self.sender.is_some() { + next_state + } else { + PollSenderState::Closed + }; + + result + } + + /// Checks whether this sender is been closed. + /// + /// The underlying channel that this sender was wrapping may still be open. + pub fn is_closed(&'a self) -> bool { + matches!(self.state, PollSenderState::Closed) || self.sender.is_none() + } + + /// Gets a reference to the `Sender` of the underlying channel. + /// + /// If `PollSender` has been closed, `None` is returned. The underlying channel that this sender + /// was wrapping may still be open. + pub fn get_ref(&self) -> Option<&Sender> { + self.sender.as_ref() + } + + /// Closes this sender. + /// + /// No more messages will be able to be sent from this sender, but the underlying channel will + /// remain open until all senders have dropped, or until the [`Receiver`] closes the channel. + pub fn close(&mut self) { + // Mark ourselves officially closed by dropping our main sender. + self.sender = None; + self.state = PollSenderState::Closed; + } + } + + impl<'a, T, R> Sink for PollSender<'a, T, R> where T: 'a + Default, R: 'a + Recycle { + type Error = Closed; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).poll_reserve(cx).map_err(|_| Closed(T::default())) + } + + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + Pin::into_inner(self).send_item(item) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).close(); + Poll::Ready(Ok(())) + } + } + } + // === impl Receiver === impl Receiver { @@ -589,6 +748,19 @@ feature! { } } + feature! { + #![feature = "stream"] + use futures::stream::Stream; + + impl Stream for Receiver { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_recv(cx) + } + } + } + impl fmt::Debug for Inner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Inner") diff --git a/src/mpsc/errors.rs b/src/mpsc/errors.rs index 4aca5da..97336c8 100644 --- a/src/mpsc/errors.rs +++ b/src/mpsc/errors.rs @@ -35,13 +35,14 @@ pub enum TryRecvError { } /// Error returned by [`Sender::send`] or [`Sender::send_ref`] (and -/// [`StaticSender::send`]/[`StaticSender::send_ref`]), if the -/// [`Receiver`] half of the channel has been dropped. +/// [`StaticSender::send`]/[`StaticSender::send_ref`]/[`PollSender::Send`]), +/// if the [`Receiver`] half of the channel has been dropped. /// /// [`Sender::send`]: super::Sender::send /// [`Sender::send_ref`]: super::Sender::send_ref /// [`StaticSender::send`]: super::StaticSender::send /// [`StaticSender::send_ref`]: super::StaticSender::send_ref +/// [`PollSender::send`]: super::PollSender::send /// [`Receiver`]: super::Receiver #[derive(PartialEq, Eq)] pub struct Closed(pub(crate) T);