Skip to content

Commit

Permalink
feat(mpsc): add Receiver::try_recv method (#60)
Browse files Browse the repository at this point in the history
* feat(mpsc): add `Receiver::try_recv` method

* refac: replace return value of `pop_ref` to `Result<Ref<'slots, T>, TryRecvError>`

* docs: fix doc
  • Loading branch information
name1e5s authored Apr 23, 2022
1 parent 97f023d commit e04661f
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 9 deletions.
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
atomic::{AtomicUsize, Ordering::*},
cell::{MutPtr, UnsafeCell},
},
mpsc::errors::TrySendError,
mpsc::errors::{TryRecvError, TrySendError},
util::{Backoff, CachePadded},
};

Expand Down Expand Up @@ -281,7 +281,7 @@ impl Core {
}

#[inline(always)]
fn pop_ref<'slots, T>(&self, slots: &'slots [Slot<T>]) -> Result<Ref<'slots, T>, TrySendError> {
fn pop_ref<'slots, T>(&self, slots: &'slots [Slot<T>]) -> Result<Ref<'slots, T>, TryRecvError> {
test_println!("pop_ref");
let mut backoff = Backoff::new();
let mut head = self.head.load(Relaxed);
Expand Down Expand Up @@ -347,15 +347,15 @@ impl Core {

if test_dbg!(tail & !self.closed == head) {
return if test_dbg!(tail & self.closed != 0) {
Err(TrySendError::Closed(()))
Err(TryRecvError::Closed)
} else {
test_println!("--> channel full!");
Err(TrySendError::Full(()))
test_println!("--> channel empty!");
Err(TryRecvError::Empty)
};
}

if test_dbg!(backoff.done_spinning()) {
return Err(TrySendError::Full(()));
return Err(TryRecvError::Empty);
}

backoff.spin();
Expand Down
20 changes: 17 additions & 3 deletions src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,14 @@
//! [blocking sender]: blocking::Sender
use crate::{
loom::{atomic::AtomicUsize, hint},
recycling::Recycle,
recycling::{take, Recycle},
wait::{Notify, WaitCell, WaitQueue, WaitResult},
Core, Ref, Slot,
};
use core::{fmt, task::Poll};

pub mod errors;
use self::errors::TrySendError;
use self::errors::{TryRecvError, TrySendError};

#[derive(Debug)]
struct ChannelCore<N> {
Expand Down Expand Up @@ -356,6 +356,20 @@ where
}
}

fn try_recv_ref<'a, T>(&'a self, slots: &'a [Slot<T>]) -> Result<Ref<'a, T>, TryRecvError> {
self.core.pop_ref(slots)
}

fn try_recv<T, R>(&self, slots: &[Slot<T>], recycle: &R) -> Result<T, TryRecvError>
where
R: Recycle<T>,
{
match self.try_recv_ref(slots) {
Ok(mut slot) => Ok(take(&mut *slot, recycle)),
Err(e) => Err(e),
}
}

/// Performs one iteration of the `recv_ref` loop.
///
/// The loop itself has to be written in the actual `send` method's
Expand All @@ -371,7 +385,7 @@ where
// If we got a value, return it!
match self.core.pop_ref(slots) {
Ok(slot) => return Poll::Ready(Some(slot)),
Err(TrySendError::Closed(_)) => return Poll::Ready(None),
Err(TryRecvError::Closed) => return Poll::Ready(None),
_ => {}
}
};
Expand Down
136 changes: 136 additions & 0 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,74 @@ feature! {
}
}

/// Attempts to receive the next message for this receiver by reference
/// without waiting for a new message when the channel is empty.
///
/// This method differs from [`recv_ref`] by returning immediately if the
/// channel is empty or closed.
///
/// # Errors
///
/// This method returns an error when the channel is closed or there are
/// no remaining messages in the channel's buffer.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{channel, errors::TryRecvError};
///
/// let (tx, rx) = channel(100);
/// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty)));
///
/// tx.try_send(1).unwrap();
/// drop(tx);
///
/// assert_eq!(*rx.try_recv_ref().unwrap(), 1);
/// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Closed)));
/// ```
///
/// [`recv_ref`]: Self::recv_ref
pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError>
where
R: Recycle<T>,
{
self.inner.core.try_recv_ref(self.inner.slots.as_ref())
}

/// Attempts to receive the next message for this receiver by reference
/// without waiting for a new message when the channel is empty.
///
/// This method differs from [`recv`] by returning immediately if the
/// channel is empty or closed.
///
/// # Errors
///
/// This method returns an error when the channel is closed or there are
/// no remaining messages in the channel's buffer.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{channel, errors::TryRecvError};
///
/// let (tx, rx) = channel(100);
/// assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
///
/// tx.try_send(1).unwrap();
/// drop(tx);
///
/// assert_eq!(rx.try_recv().unwrap(), 1);
/// assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
/// ```
///
/// [`recv`]: Self::recv
pub fn try_recv(&self) -> Result<T, TryRecvError>
where
R: Recycle<T>,
{
self.inner.core.try_recv(self.inner.slots.as_ref(), &self.inner.recycle)
}

/// Attempts to receive a message *by reference* from this channel,
/// registering the current task for wakeup if the a message is not yet
/// available, and returning `None` if the channel has closed and all
Expand Down Expand Up @@ -1059,6 +1127,74 @@ feature! {
}
}

/// Attempts to receive the next message for this receiver by reference
/// without waiting for a new message when the channel is empty.
///
/// This method differs from [`recv_ref`] by returning immediately if the
/// channel is empty or closed.
///
/// # Errors
///
/// This method returns an error when the channel is closed or there are
/// no remaining messages in the channel's buffer.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{channel, errors::TryRecvError};
///
/// let (tx, rx) = channel(100);
/// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty)));
///
/// tx.try_send(1).unwrap();
/// drop(tx);
///
/// assert_eq!(*rx.try_recv_ref().unwrap(), 1);
/// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Closed)));
/// ```
///
/// [`recv_ref`]: Self::recv_ref
pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError>
where
R: Recycle<T>,
{
self.core.try_recv_ref(self.slots.as_ref())
}

/// Attempts to receive the next message for this receiver by reference
/// without waiting for a new message when the channel is empty.
///
/// This method differs from [`recv`] by returning immediately if the
/// channel is empty or closed.
///
/// # Errors
///
/// This method returns an error when the channel is closed or there are
/// no remaining messages in the channel's buffer.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{channel, errors::TryRecvError};
///
/// let (tx, rx) = channel(100);
/// assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
///
/// tx.try_send(1).unwrap();
/// drop(tx);
///
/// assert_eq!(rx.try_recv().unwrap(), 1);
/// assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
/// ```
///
/// [`recv`]: Self::recv
pub fn try_recv(&self) -> Result<T, TryRecvError>
where
R: Recycle<T>,
{
self.core.try_recv(self.slots.as_ref(), self.recycle)
}

/// Attempts to receive a message *by reference* from this channel,
/// registering the current task for wakeup if the a message is not yet
/// available, and returning `None` if the channel has closed and all
Expand Down
135 changes: 135 additions & 0 deletions src/mpsc/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,74 @@ feature! {
Some(recycling::take(&mut *val, self.recycle))
}

/// Attempts to receive the next message for this receiver by reference
/// without blocking.
///
/// This method differs from [`recv_ref`] by returning immediately if the
/// channel is empty or closed.
///
/// # Errors
///
/// This method returns an error when the channel is closed or there are
/// no remaining messages in the channel's buffer.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{blocking, errors::TryRecvError};
///
/// let (tx, rx) = blocking::channel(100);
/// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty)));
///
/// tx.send(1).unwrap();
/// drop(tx);
///
/// assert_eq!(*rx.try_recv_ref().unwrap(), 1);
/// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Closed)));
/// ```
///
/// [`recv_ref`]: Self::recv_ref
pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError>
where
R: Recycle<T>,
{
self.core.try_recv_ref(self.slots.as_ref())
}

/// Attempts to receive the next message for this receiver by value
/// without blocking.
///
/// This method differs from [`recv`] by returning immediately if the
/// channel is empty or closed.
///
/// # Errors
///
/// This method returns an error when the channel is closed or there are
/// no remaining messages in the channel's buffer.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{blocking, errors::TryRecvError};
///
/// let (tx, rx) = blocking::channel(100);
/// assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
///
/// tx.send(1).unwrap();
/// drop(tx);
///
/// assert_eq!(rx.try_recv().unwrap(), 1);
/// assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
/// ```
///
/// [`recv`]: Self::recv
pub fn try_recv(&self) -> Result<T, TryRecvError>
where
R: Recycle<T>,
{
self.core.try_recv(self.slots.as_ref(), self.recycle)
}

/// Returns `true` if the channel has closed (all corresponding
/// [`StaticSender`]s have been dropped).
///
Expand Down Expand Up @@ -966,6 +1034,73 @@ impl<T, R> Receiver<T, R> {
Some(recycling::take(&mut *val, &self.inner.recycle))
}

/// Attempts to receive the next message for this receiver by reference
/// without blocking.
///
/// This method differs from [`recv_ref`] by returning immediately if the
/// channel is empty or closed.
///
/// # Errors
///
/// This method returns an error when the channel is closed or there are
/// no remaining messages in the channel's buffer.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{blocking, errors::TryRecvError};
///
/// let (tx, rx) = blocking::channel(100);
/// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Empty)));
///
/// tx.send(1).unwrap();
/// drop(tx);
///
/// assert_eq!(*rx.try_recv_ref().unwrap(), 1);
/// assert!(matches!(rx.try_recv_ref(), Err(TryRecvError::Closed)));
/// ```
///
/// [`recv_ref`]: Self::recv_ref
pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError> {
self.inner.core.try_recv_ref(self.inner.slots.as_ref())
}

/// Attempts to receive the next message for this receiver by value
/// without blocking.
///
/// This method differs from [`recv`] by returning immediately if the
/// channel is empty or closed.
///
/// # Errors
///
/// This method returns an error when the channel is closed or there are
/// no remaining messages in the channel's buffer.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{blocking, errors::TryRecvError};
///
/// let (tx, rx) = blocking::channel(100);
/// assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
///
/// tx.send(1).unwrap();
/// drop(tx);
///
/// assert_eq!(rx.try_recv().unwrap(), 1);
/// assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
/// ```
///
/// [`recv`]: Self::recv
pub fn try_recv(&self) -> Result<T, TryRecvError>
where
R: Recycle<T>,
{
self.inner
.core
.try_recv(self.inner.slots.as_ref(), &self.inner.recycle)
}

/// Returns `true` if the channel has closed (all corresponding
/// [`Sender`]s have been dropped).
///
Expand Down
Loading

0 comments on commit e04661f

Please sign in to comment.