From e04661fc72b08c4e2517a9f0bfb16aa7d2fed683 Mon Sep 17 00:00:00 2001 From: Name1e5s <836401406@qq.com> Date: Sun, 24 Apr 2022 00:09:38 +0800 Subject: [PATCH] feat(mpsc): add `Receiver::try_recv` method (#60) * feat(mpsc): add `Receiver::try_recv` method * refac: replace return value of `pop_ref` to `Result, TryRecvError>` * docs: fix doc --- src/lib.rs | 12 ++-- src/mpsc.rs | 20 +++++- src/mpsc/async_impl.rs | 136 +++++++++++++++++++++++++++++++++++++++++ src/mpsc/blocking.rs | 135 ++++++++++++++++++++++++++++++++++++++++ src/mpsc/errors.rs | 27 ++++++++ 5 files changed, 321 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4163c12..f570beb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,7 +39,7 @@ use crate::{ atomic::{AtomicUsize, Ordering::*}, cell::{MutPtr, UnsafeCell}, }, - mpsc::errors::TrySendError, + mpsc::errors::{TryRecvError, TrySendError}, util::{Backoff, CachePadded}, }; @@ -281,7 +281,7 @@ impl Core { } #[inline(always)] - fn pop_ref<'slots, T>(&self, slots: &'slots [Slot]) -> Result, TrySendError> { + fn pop_ref<'slots, T>(&self, slots: &'slots [Slot]) -> Result, TryRecvError> { test_println!("pop_ref"); let mut backoff = Backoff::new(); let mut head = self.head.load(Relaxed); @@ -347,15 +347,15 @@ impl Core { if test_dbg!(tail & !self.closed == head) { return if test_dbg!(tail & self.closed != 0) { - Err(TrySendError::Closed(())) + Err(TryRecvError::Closed) } else { - test_println!("--> channel full!"); - Err(TrySendError::Full(())) + test_println!("--> channel empty!"); + Err(TryRecvError::Empty) }; } if test_dbg!(backoff.done_spinning()) { - return Err(TrySendError::Full(())); + return Err(TryRecvError::Empty); } backoff.spin(); diff --git a/src/mpsc.rs b/src/mpsc.rs index 5491af4..5825b48 100644 --- a/src/mpsc.rs +++ b/src/mpsc.rs @@ -248,14 +248,14 @@ //! [blocking sender]: blocking::Sender use crate::{ loom::{atomic::AtomicUsize, hint}, - recycling::Recycle, + recycling::{take, Recycle}, wait::{Notify, WaitCell, WaitQueue, WaitResult}, Core, Ref, Slot, }; use core::{fmt, task::Poll}; pub mod errors; -use self::errors::TrySendError; +use self::errors::{TryRecvError, TrySendError}; #[derive(Debug)] struct ChannelCore { @@ -356,6 +356,20 @@ where } } + fn try_recv_ref<'a, T>(&'a self, slots: &'a [Slot]) -> Result, TryRecvError> { + self.core.pop_ref(slots) + } + + fn try_recv(&self, slots: &[Slot], recycle: &R) -> Result + where + R: Recycle, + { + match self.try_recv_ref(slots) { + Ok(mut slot) => Ok(take(&mut *slot, recycle)), + Err(e) => Err(e), + } + } + /// Performs one iteration of the `recv_ref` loop. /// /// The loop itself has to be written in the actual `send` method's @@ -371,7 +385,7 @@ where // If we got a value, return it! match self.core.pop_ref(slots) { Ok(slot) => return Poll::Ready(Some(slot)), - Err(TrySendError::Closed(_)) => return Poll::Ready(None), + Err(TryRecvError::Closed) => return Poll::Ready(None), _ => {} } }; diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index 13277d9..1daba8a 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -422,6 +422,74 @@ feature! { } } + /// Attempts to receive the next message for this receiver by reference + /// without waiting for a new message when the channel is empty. + /// + /// This method differs from [`recv_ref`] by returning immediately if the + /// channel is empty or closed. + /// + /// # Errors + /// + /// This method returns an error when the channel is closed or there are + /// no remaining messages in the channel's buffer. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{channel, errors::TryRecvError}; + /// + /// let (tx, rx) = channel(100); + /// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty))); + /// + /// tx.try_send(1).unwrap(); + /// drop(tx); + /// + /// assert_eq!(*rx.try_recv_ref().unwrap(), 1); + /// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Closed))); + /// ``` + /// + /// [`recv_ref`]: Self::recv_ref + pub fn try_recv_ref(&self) -> Result, TryRecvError> + where + R: Recycle, + { + self.inner.core.try_recv_ref(self.inner.slots.as_ref()) + } + + /// Attempts to receive the next message for this receiver by reference + /// without waiting for a new message when the channel is empty. + /// + /// This method differs from [`recv`] by returning immediately if the + /// channel is empty or closed. + /// + /// # Errors + /// + /// This method returns an error when the channel is closed or there are + /// no remaining messages in the channel's buffer. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{channel, errors::TryRecvError}; + /// + /// let (tx, rx) = channel(100); + /// assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + /// + /// tx.try_send(1).unwrap(); + /// drop(tx); + /// + /// assert_eq!(rx.try_recv().unwrap(), 1); + /// assert_eq!(rx.try_recv(), Err(TryRecvError::Closed)); + /// ``` + /// + /// [`recv`]: Self::recv + pub fn try_recv(&self) -> Result + where + R: Recycle, + { + self.inner.core.try_recv(self.inner.slots.as_ref(), &self.inner.recycle) + } + /// Attempts to receive a message *by reference* from this channel, /// registering the current task for wakeup if the a message is not yet /// available, and returning `None` if the channel has closed and all @@ -1059,6 +1127,74 @@ feature! { } } + /// Attempts to receive the next message for this receiver by reference + /// without waiting for a new message when the channel is empty. + /// + /// This method differs from [`recv_ref`] by returning immediately if the + /// channel is empty or closed. + /// + /// # Errors + /// + /// This method returns an error when the channel is closed or there are + /// no remaining messages in the channel's buffer. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{channel, errors::TryRecvError}; + /// + /// let (tx, rx) = channel(100); + /// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty))); + /// + /// tx.try_send(1).unwrap(); + /// drop(tx); + /// + /// assert_eq!(*rx.try_recv_ref().unwrap(), 1); + /// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Closed))); + /// ``` + /// + /// [`recv_ref`]: Self::recv_ref + pub fn try_recv_ref(&self) -> Result, TryRecvError> + where + R: Recycle, + { + self.core.try_recv_ref(self.slots.as_ref()) + } + + /// Attempts to receive the next message for this receiver by reference + /// without waiting for a new message when the channel is empty. + /// + /// This method differs from [`recv`] by returning immediately if the + /// channel is empty or closed. + /// + /// # Errors + /// + /// This method returns an error when the channel is closed or there are + /// no remaining messages in the channel's buffer. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{channel, errors::TryRecvError}; + /// + /// let (tx, rx) = channel(100); + /// assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + /// + /// tx.try_send(1).unwrap(); + /// drop(tx); + /// + /// assert_eq!(rx.try_recv().unwrap(), 1); + /// assert_eq!(rx.try_recv(), Err(TryRecvError::Closed)); + /// ``` + /// + /// [`recv`]: Self::recv + pub fn try_recv(&self) -> Result + where + R: Recycle, + { + self.core.try_recv(self.slots.as_ref(), self.recycle) + } + /// Attempts to receive a message *by reference* from this channel, /// registering the current task for wakeup if the a message is not yet /// available, and returning `None` if the channel has closed and all diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index fa55b72..7b1b1a4 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -575,6 +575,74 @@ feature! { Some(recycling::take(&mut *val, self.recycle)) } + /// Attempts to receive the next message for this receiver by reference + /// without blocking. + /// + /// This method differs from [`recv_ref`] by returning immediately if the + /// channel is empty or closed. + /// + /// # Errors + /// + /// This method returns an error when the channel is closed or there are + /// no remaining messages in the channel's buffer. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{blocking, errors::TryRecvError}; + /// + /// let (tx, rx) = blocking::channel(100); + /// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty))); + /// + /// tx.send(1).unwrap(); + /// drop(tx); + /// + /// assert_eq!(*rx.try_recv_ref().unwrap(), 1); + /// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Closed))); + /// ``` + /// + /// [`recv_ref`]: Self::recv_ref + pub fn try_recv_ref(&self) -> Result, TryRecvError> + where + R: Recycle, + { + self.core.try_recv_ref(self.slots.as_ref()) + } + + /// Attempts to receive the next message for this receiver by value + /// without blocking. + /// + /// This method differs from [`recv`] by returning immediately if the + /// channel is empty or closed. + /// + /// # Errors + /// + /// This method returns an error when the channel is closed or there are + /// no remaining messages in the channel's buffer. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{blocking, errors::TryRecvError}; + /// + /// let (tx, rx) = blocking::channel(100); + /// assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + /// + /// tx.send(1).unwrap(); + /// drop(tx); + /// + /// assert_eq!(rx.try_recv().unwrap(), 1); + /// assert_eq!(rx.try_recv(), Err(TryRecvError::Closed)); + /// ``` + /// + /// [`recv`]: Self::recv + pub fn try_recv(&self) -> Result + where + R: Recycle, + { + self.core.try_recv(self.slots.as_ref(), self.recycle) + } + /// Returns `true` if the channel has closed (all corresponding /// [`StaticSender`]s have been dropped). /// @@ -966,6 +1034,73 @@ impl Receiver { Some(recycling::take(&mut *val, &self.inner.recycle)) } + /// Attempts to receive the next message for this receiver by reference + /// without blocking. + /// + /// This method differs from [`recv_ref`] by returning immediately if the + /// channel is empty or closed. + /// + /// # Errors + /// + /// This method returns an error when the channel is closed or there are + /// no remaining messages in the channel's buffer. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{blocking, errors::TryRecvError}; + /// + /// let (tx, rx) = blocking::channel(100); + /// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty))); + /// + /// tx.send(1).unwrap(); + /// drop(tx); + /// + /// assert_eq!(*rx.try_recv_ref().unwrap(), 1); + /// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Closed))); + /// ``` + /// + /// [`recv_ref`]: Self::recv_ref + pub fn try_recv_ref(&self) -> Result, TryRecvError> { + self.inner.core.try_recv_ref(self.inner.slots.as_ref()) + } + + /// Attempts to receive the next message for this receiver by value + /// without blocking. + /// + /// This method differs from [`recv`] by returning immediately if the + /// channel is empty or closed. + /// + /// # Errors + /// + /// This method returns an error when the channel is closed or there are + /// no remaining messages in the channel's buffer. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{blocking, errors::TryRecvError}; + /// + /// let (tx, rx) = blocking::channel(100); + /// assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + /// + /// tx.send(1).unwrap(); + /// drop(tx); + /// + /// assert_eq!(rx.try_recv().unwrap(), 1); + /// assert_eq!(rx.try_recv(), Err(TryRecvError::Closed)); + /// ``` + /// + /// [`recv`]: Self::recv + pub fn try_recv(&self) -> Result + where + R: Recycle, + { + self.inner + .core + .try_recv(self.inner.slots.as_ref(), &self.inner.recycle) + } + /// Returns `true` if the channel has closed (all corresponding /// [`Sender`]s have been dropped). /// diff --git a/src/mpsc/errors.rs b/src/mpsc/errors.rs index e044c75..4aca5da 100644 --- a/src/mpsc/errors.rs +++ b/src/mpsc/errors.rs @@ -21,6 +21,19 @@ pub enum TrySendError { Closed(T), } +/// Error returned by the [`Receiver::recv`] and [`Receiver::recv_ref`] methods. +/// +/// [`Receiver::recv`]: super::Receiver::recv +/// [`Receiver::recv_ref`]: super::Receiver::recv_ref +#[non_exhaustive] +#[derive(Debug, PartialEq, Eq)] +pub enum TryRecvError { + /// The channel is empty. + Empty, + /// The channel is closed. + Closed, +} + /// Error returned by [`Sender::send`] or [`Sender::send_ref`] (and /// [`StaticSender::send`]/[`StaticSender::send_ref`]), if the /// [`Receiver`] half of the channel has been dropped. @@ -124,3 +137,17 @@ impl fmt::Display for TrySendError { #[cfg(feature = "std")] impl std::error::Error for TrySendError {} + +// == impl TryRecvError == + +#[cfg(feature = "std")] +impl std::error::Error for TryRecvError {} + +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + Self::Empty => "channel is empty", + Self::Closed => "channel closed", + }) + } +}