Skip to content

Commit

Permalink
feat(blocking::mpsc): add Receiver::recv(_ref)_timeout methods (#75)
Browse files Browse the repository at this point in the history
Added recv_timeout methods for
`blocking::mpsc::Receiver`/`StaticReceiver`. It requires a simple change
from `thread::park` to `thread::park_timeout` & an elapsed time check.
We need this functionality so that our threads can wake up at least once
every few seconds to check for control events

Co-authored-by: Eliza Weisman <[email protected]>
  • Loading branch information
utkarshgupta137 and hawkw authored Apr 26, 2023
1 parent a0e5740 commit b57ce88
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 7 deletions.
240 changes: 240 additions & 0 deletions src/mpsc/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
};
use core::{fmt, pin::Pin};
use errors::*;
use std::time::{Duration, Instant};

/// Returns a new synchronous multi-producer, single consumer (MPSC)
/// channel with the provided capacity.
Expand Down Expand Up @@ -574,6 +575,110 @@ feature! {
Some(recycling::take(&mut *val, self.recycle))
}

/// Receives the next message for this receiver, **by reference**, waiting for at most `timeout`.
///
/// If there are no messages in the channel's buffer, but the channel has
/// not yet been closed, this method will block until a message is sent,
/// the channel is closed, or the provided `timeout` has elapsed.
///
/// # Returns
///
/// - [`Ok`]`(`[`RecvRef`]`<T>)` if a message was received.
/// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed.
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{blocking::StaticChannel, errors::RecvTimeoutError};
/// use std::{thread, fmt::Write, time::Duration};
///
/// static CHANNEL: StaticChannel<String, 100> = StaticChannel::new();
/// let (tx, rx) = CHANNEL.split();
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(150));
/// let mut value = tx.send_ref().unwrap();
/// write!(value, "hello world!")
/// .expect("writing to a `String` should never fail");
/// });
///
/// assert_eq!(
/// Err(&RecvTimeoutError::Timeout),
/// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str)
/// );
/// assert_eq!(
/// Ok("hello world!"),
/// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str)
/// );
/// assert_eq!(
/// Err(&RecvTimeoutError::Closed),
/// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str)
/// );
/// ```
#[cfg(not(all(test, loom)))]
pub fn recv_ref_timeout(&self, timeout: Duration) -> Result<RecvRef<'_, T>, RecvTimeoutError> {
recv_ref_timeout(self.core, self.slots, timeout)
}

/// Receives the next message for this receiver, **by value**, waiting for at most `timeout`.
///
/// If there are no messages in the channel's buffer, but the channel
/// has not yet been closed, this method will block until a message is
/// sent, the channel is closed, or the provided `timeout` has elapsed.
///
/// When a message is received, it is moved out of the channel by value,
/// and replaced with a new slot according to the configured [recycling
/// policy]. If all [`StaticSender`]s for this channel write to the
/// channel's slots in place by using the [`send_ref`] or
/// [`try_send_ref`] methods, consider using the [`recv_ref_timeout`]
/// method instead, to enable the reuse of heap allocations.
///
/// [`send_ref`]: StaticSender::send_ref
/// [`try_send_ref`]: StaticSender::try_send_ref
/// [recycling policy]: crate::recycling::Recycle
/// [`recv_ref_timeout`]: Self::recv_ref_timeout
///
/// # Returns
///
/// - [`Ok`]`(<T>)` if a message was received.
/// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed.
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{blocking::StaticChannel, errors::RecvTimeoutError};
/// use std::{thread, time::Duration};
///
/// static CHANNEL: StaticChannel<i32, 100> = StaticChannel::new();
/// let (tx, rx) = CHANNEL.split();
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(150));
/// tx.send(1).unwrap();
/// });
///
/// assert_eq!(
/// Err(RecvTimeoutError::Timeout),
/// rx.recv_timeout(Duration::from_millis(100))
/// );
/// assert_eq!(
/// Ok(1),
/// rx.recv_timeout(Duration::from_millis(100))
/// );
/// assert_eq!(
/// Err(RecvTimeoutError::Closed),
/// rx.recv_timeout(Duration::from_millis(100))
/// );
/// ```
#[cfg(not(all(test, loom)))]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>
where
R: Recycle<T>,
{
let mut val = self.recv_ref_timeout(timeout)?;
Ok(recycling::take(&mut *val, self.recycle))
}

/// Attempts to receive the next message for this receiver by reference
/// without blocking.
///
Expand Down Expand Up @@ -1033,6 +1138,109 @@ impl<T, R> Receiver<T, R> {
Some(recycling::take(&mut *val, &self.inner.recycle))
}

/// Receives the next message for this receiver, **by reference**, waiting for at most `timeout`.
///
/// If there are no messages in the channel's buffer, but the channel has
/// not yet been closed, this method will block until a message is sent,
/// the channel is closed, or the provided `timeout` has elapsed.
///
/// # Returns
///
/// - [`Ok`]`(`[`RecvRef`]`<T>)` if a message was received.
/// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed.
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{blocking, errors::RecvTimeoutError};
/// use std::{thread, fmt::Write, time::Duration};
///
/// let (tx, rx) = blocking::channel::<String>(100);
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(150));
/// let mut value = tx.send_ref().unwrap();
/// write!(value, "hello world!")
/// .expect("writing to a `String` should never fail");
/// });
///
/// assert_eq!(
/// Err(&RecvTimeoutError::Timeout),
/// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str)
/// );
/// assert_eq!(
/// Ok("hello world!"),
/// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str)
/// );
/// assert_eq!(
/// Err(&RecvTimeoutError::Closed),
/// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str)
/// );
/// ```
#[cfg(not(all(test, loom)))]
pub fn recv_ref_timeout(&self, timeout: Duration) -> Result<RecvRef<'_, T>, RecvTimeoutError> {
recv_ref_timeout(&self.inner.core, self.inner.slots.as_ref(), timeout)
}

/// Receives the next message for this receiver, **by value**, waiting for at most `timeout`.
///
/// If there are no messages in the channel's buffer, but the channel
/// has not yet been closed, this method will block until a message is
/// sent, the channel is closed, or the provided `timeout` has elapsed.
///
/// When a message is received, it is moved out of the channel by value,
/// and replaced with a new slot according to the configured [recycling
/// policy]. If all [`Sender`]s for this channel write to the
/// channel's slots in place by using the [`send_ref`] or
/// [`try_send_ref`] methods, consider using the [`recv_ref_timeout`]
/// method instead, to enable the reuse of heap allocations.
///
/// [`send_ref`]: Sender::send_ref
/// [`try_send_ref`]: Sender::try_send_ref
/// [recycling policy]: crate::recycling::Recycle
/// [`recv_ref_timeout`]: Self::recv_ref_timeout
///
/// # Returns
///
/// - [`Ok`]`(<T>)` if a message was received.
/// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed.
/// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed.
///
/// # Examples
///
/// ```
/// use thingbuf::mpsc::{blocking, errors::RecvTimeoutError};
/// use std::{thread, fmt::Write, time::Duration};
///
/// let (tx, rx) = blocking::channel(100);
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(150));
/// tx.send(1).unwrap();
/// });
///
/// assert_eq!(
/// Err(RecvTimeoutError::Timeout),
/// rx.recv_timeout(Duration::from_millis(100))
/// );
/// assert_eq!(
/// Ok(1),
/// rx.recv_timeout(Duration::from_millis(100))
/// );
/// assert_eq!(
/// Err(RecvTimeoutError::Closed),
/// rx.recv_timeout(Duration::from_millis(100))
/// );
/// ```
#[cfg(not(all(test, loom)))]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>
where
R: Recycle<T>,
{
let mut val = self.recv_ref_timeout(timeout)?;
Ok(recycling::take(&mut *val, &self.inner.recycle))
}

/// Attempts to receive the next message for this receiver by reference
/// without blocking.
///
Expand Down Expand Up @@ -1165,6 +1373,38 @@ fn recv_ref<'a, T>(core: &'a ChannelCore<Thread>, slots: &'a [Slot<T>]) -> Optio
}
}

#[cfg(not(all(test, loom)))]
#[inline]
fn recv_ref_timeout<'a, T>(
core: &'a ChannelCore<Thread>,
slots: &'a [Slot<T>],
timeout: Duration,
) -> Result<RecvRef<'a, T>, RecvTimeoutError> {
let beginning_park = Instant::now();
loop {
match core.poll_recv_ref(slots, thread::current) {
Poll::Ready(r) => {
return r
.map(|slot| {
RecvRef(RecvRefInner {
_notify: super::NotifyTx(&core.tx_wait),
slot,
})
})
.ok_or(RecvTimeoutError::Closed);
}
Poll::Pending => {
test_println!("park_timeout ({:?})", thread::current());
thread::park_timeout(timeout);
let elapsed = beginning_park.elapsed();
if elapsed >= timeout {
return Err(RecvTimeoutError::Timeout);
}
}
}
}
}

#[inline]
fn send_ref<'a, T, R: Recycle<T>>(
core: &'a ChannelCore<Thread>,
Expand Down
36 changes: 33 additions & 3 deletions src/mpsc/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,25 @@ pub enum TrySendError<T = ()> {
Closed(T),
}

/// Error returned by the [`Receiver::recv`] and [`Receiver::recv_ref`] methods.
/// Error returned by the [`Receiver::recv_timeout`] and [`Receiver::recv_ref_timeout`] methods
/// (blocking only).
///
/// [`Receiver::recv`]: super::Receiver::recv
/// [`Receiver::recv_ref`]: super::Receiver::recv_ref
/// [`Receiver::recv_timeout`]: super::blocking::Receiver::recv_timeout
/// [`Receiver::recv_ref_timeout`]: super::blocking::Receiver::recv_ref_timeout
#[cfg(feature = "std")]
#[non_exhaustive]
#[derive(Debug, PartialEq, Eq)]
pub enum RecvTimeoutError {
/// The timeout elapsed before data could be received.
Timeout,
/// The channel is closed.
Closed,
}

/// Error returned by the [`Receiver::try_recv`] and [`Receiver::try_recv_ref`] methods.
///
/// [`Receiver::try_recv`]: super::Receiver::try_recv
/// [`Receiver::try_recv_ref`]: super::Receiver::try_recv_ref
#[non_exhaustive]
#[derive(Debug, PartialEq, Eq)]
pub enum TryRecvError {
Expand Down Expand Up @@ -138,6 +153,21 @@ impl<T> fmt::Display for TrySendError<T> {
#[cfg(feature = "std")]
impl<T> std::error::Error for TrySendError<T> {}

// === impl RecvTimeoutError ===

#[cfg(feature = "std")]
impl std::error::Error for RecvTimeoutError {}

#[cfg(feature = "std")]
impl fmt::Display for RecvTimeoutError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::Timeout => "timed out waiting on channel",
Self::Closed => "channel closed",
})
}
}

// == impl TryRecvError ==

#[cfg(feature = "std")]
Expand Down
4 changes: 2 additions & 2 deletions src/thingbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ where
/// [`push`]: Self::push_ref
pub fn push_ref(&self) -> Result<Ref<'_, T>, Full> {
self.core
.push_ref(&*self.slots, &self.recycle)
.push_ref(&self.slots, &self.recycle)
.map_err(|e| match e {
crate::mpsc::errors::TrySendError::Full(()) => Full(()),
_ => unreachable!(),
Expand Down Expand Up @@ -459,7 +459,7 @@ where
/// [`push`]: Self::push
/// [`pop`]: Self::pop
pub fn pop_ref(&self) -> Option<Ref<'_, T>> {
self.core.pop_ref(&*self.slots).ok()
self.core.pop_ref(&self.slots).ok()
}

/// Dequeue the first element in the queue *by value*, moving it out of the
Expand Down
4 changes: 2 additions & 2 deletions src/wait/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl<T> List<T> {
}

fn is_empty(&self) -> bool {
self.head == None && self.tail == None
self.head.is_none() && self.tail.is_none()
}
}

Expand Down Expand Up @@ -658,7 +658,7 @@ mod tests {
self.0.store(true, Ordering::SeqCst);
}

fn same(&self, &Self(ref other): &Self) -> bool {
fn same(&self, Self(other): &Self) -> bool {
Arc::ptr_eq(&self.0, other)
}
}
Expand Down

0 comments on commit b57ce88

Please sign in to comment.