diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index b46f47d..e2ebdff 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -16,6 +16,7 @@ use crate::{ }; use core::{fmt, pin::Pin}; use errors::*; +use std::time::{Duration, Instant}; /// Returns a new synchronous multi-producer, single consumer (MPSC) /// channel with the provided capacity. @@ -574,6 +575,110 @@ feature! { Some(recycling::take(&mut *val, self.recycle)) } + /// Receives the next message for this receiver, **by reference**, waiting for at most `timeout`. + /// + /// If there are no messages in the channel's buffer, but the channel has + /// not yet been closed, this method will block until a message is sent, + /// the channel is closed, or the provided `timeout` has elapsed. + /// + /// # Returns + /// + /// - [`Ok`]`(`[`RecvRef`]`)` if a message was received. + /// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed. + /// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed. + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{blocking::StaticChannel, errors::RecvTimeoutError}; + /// use std::{thread, fmt::Write, time::Duration}; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// let (tx, rx) = CHANNEL.split(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(150)); + /// let mut value = tx.send_ref().unwrap(); + /// write!(value, "hello world!") + /// .expect("writing to a `String` should never fail"); + /// }); + /// + /// assert_eq!( + /// Err(&RecvTimeoutError::Timeout), + /// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str) + /// ); + /// assert_eq!( + /// Ok("hello world!"), + /// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str) + /// ); + /// assert_eq!( + /// Err(&RecvTimeoutError::Closed), + /// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str) + /// ); + /// ``` + #[cfg(not(all(test, loom)))] + pub fn recv_ref_timeout(&self, timeout: Duration) -> Result, RecvTimeoutError> { + recv_ref_timeout(self.core, self.slots, timeout) + } + + /// Receives the next message for this receiver, **by value**, waiting for at most `timeout`. + /// + /// If there are no messages in the channel's buffer, but the channel + /// has not yet been closed, this method will block until a message is + /// sent, the channel is closed, or the provided `timeout` has elapsed. + /// + /// When a message is received, it is moved out of the channel by value, + /// and replaced with a new slot according to the configured [recycling + /// policy]. If all [`StaticSender`]s for this channel write to the + /// channel's slots in place by using the [`send_ref`] or + /// [`try_send_ref`] methods, consider using the [`recv_ref_timeout`] + /// method instead, to enable the reuse of heap allocations. + /// + /// [`send_ref`]: StaticSender::send_ref + /// [`try_send_ref`]: StaticSender::try_send_ref + /// [recycling policy]: crate::recycling::Recycle + /// [`recv_ref_timeout`]: Self::recv_ref_timeout + /// + /// # Returns + /// + /// - [`Ok`]`()` if a message was received. + /// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed. + /// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed. + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{blocking::StaticChannel, errors::RecvTimeoutError}; + /// use std::{thread, time::Duration}; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// let (tx, rx) = CHANNEL.split(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(150)); + /// tx.send(1).unwrap(); + /// }); + /// + /// assert_eq!( + /// Err(RecvTimeoutError::Timeout), + /// rx.recv_timeout(Duration::from_millis(100)) + /// ); + /// assert_eq!( + /// Ok(1), + /// rx.recv_timeout(Duration::from_millis(100)) + /// ); + /// assert_eq!( + /// Err(RecvTimeoutError::Closed), + /// rx.recv_timeout(Duration::from_millis(100)) + /// ); + /// ``` + #[cfg(not(all(test, loom)))] + pub fn recv_timeout(&self, timeout: Duration) -> Result + where + R: Recycle, + { + let mut val = self.recv_ref_timeout(timeout)?; + Ok(recycling::take(&mut *val, self.recycle)) + } + /// Attempts to receive the next message for this receiver by reference /// without blocking. /// @@ -1033,6 +1138,109 @@ impl Receiver { Some(recycling::take(&mut *val, &self.inner.recycle)) } + /// Receives the next message for this receiver, **by reference**, waiting for at most `timeout`. + /// + /// If there are no messages in the channel's buffer, but the channel has + /// not yet been closed, this method will block until a message is sent, + /// the channel is closed, or the provided `timeout` has elapsed. + /// + /// # Returns + /// + /// - [`Ok`]`(`[`RecvRef`]`)` if a message was received. + /// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed. + /// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed. + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{blocking, errors::RecvTimeoutError}; + /// use std::{thread, fmt::Write, time::Duration}; + /// + /// let (tx, rx) = blocking::channel::(100); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(150)); + /// let mut value = tx.send_ref().unwrap(); + /// write!(value, "hello world!") + /// .expect("writing to a `String` should never fail"); + /// }); + /// + /// assert_eq!( + /// Err(&RecvTimeoutError::Timeout), + /// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str) + /// ); + /// assert_eq!( + /// Ok("hello world!"), + /// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str) + /// ); + /// assert_eq!( + /// Err(&RecvTimeoutError::Closed), + /// rx.recv_ref_timeout(Duration::from_millis(100)).as_deref().map(String::as_str) + /// ); + /// ``` + #[cfg(not(all(test, loom)))] + pub fn recv_ref_timeout(&self, timeout: Duration) -> Result, RecvTimeoutError> { + recv_ref_timeout(&self.inner.core, self.inner.slots.as_ref(), timeout) + } + + /// Receives the next message for this receiver, **by value**, waiting for at most `timeout`. + /// + /// If there are no messages in the channel's buffer, but the channel + /// has not yet been closed, this method will block until a message is + /// sent, the channel is closed, or the provided `timeout` has elapsed. + /// + /// When a message is received, it is moved out of the channel by value, + /// and replaced with a new slot according to the configured [recycling + /// policy]. If all [`Sender`]s for this channel write to the + /// channel's slots in place by using the [`send_ref`] or + /// [`try_send_ref`] methods, consider using the [`recv_ref_timeout`] + /// method instead, to enable the reuse of heap allocations. + /// + /// [`send_ref`]: Sender::send_ref + /// [`try_send_ref`]: Sender::try_send_ref + /// [recycling policy]: crate::recycling::Recycle + /// [`recv_ref_timeout`]: Self::recv_ref_timeout + /// + /// # Returns + /// + /// - [`Ok`]`()` if a message was received. + /// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed. + /// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{blocking, errors::RecvTimeoutError}; + /// use std::{thread, fmt::Write, time::Duration}; + /// + /// let (tx, rx) = blocking::channel(100); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(150)); + /// tx.send(1).unwrap(); + /// }); + /// + /// assert_eq!( + /// Err(RecvTimeoutError::Timeout), + /// rx.recv_timeout(Duration::from_millis(100)) + /// ); + /// assert_eq!( + /// Ok(1), + /// rx.recv_timeout(Duration::from_millis(100)) + /// ); + /// assert_eq!( + /// Err(RecvTimeoutError::Closed), + /// rx.recv_timeout(Duration::from_millis(100)) + /// ); + /// ``` + #[cfg(not(all(test, loom)))] + pub fn recv_timeout(&self, timeout: Duration) -> Result + where + R: Recycle, + { + let mut val = self.recv_ref_timeout(timeout)?; + Ok(recycling::take(&mut *val, &self.inner.recycle)) + } + /// Attempts to receive the next message for this receiver by reference /// without blocking. /// @@ -1165,6 +1373,38 @@ fn recv_ref<'a, T>(core: &'a ChannelCore, slots: &'a [Slot]) -> Optio } } +#[cfg(not(all(test, loom)))] +#[inline] +fn recv_ref_timeout<'a, T>( + core: &'a ChannelCore, + slots: &'a [Slot], + timeout: Duration, +) -> Result, RecvTimeoutError> { + let beginning_park = Instant::now(); + loop { + match core.poll_recv_ref(slots, thread::current) { + Poll::Ready(r) => { + return r + .map(|slot| { + RecvRef(RecvRefInner { + _notify: super::NotifyTx(&core.tx_wait), + slot, + }) + }) + .ok_or(RecvTimeoutError::Closed); + } + Poll::Pending => { + test_println!("park_timeout ({:?})", thread::current()); + thread::park_timeout(timeout); + let elapsed = beginning_park.elapsed(); + if elapsed >= timeout { + return Err(RecvTimeoutError::Timeout); + } + } + } + } +} + #[inline] fn send_ref<'a, T, R: Recycle>( core: &'a ChannelCore, diff --git a/src/mpsc/errors.rs b/src/mpsc/errors.rs index 4aca5da..c5d2139 100644 --- a/src/mpsc/errors.rs +++ b/src/mpsc/errors.rs @@ -21,10 +21,25 @@ pub enum TrySendError { Closed(T), } -/// Error returned by the [`Receiver::recv`] and [`Receiver::recv_ref`] methods. +/// Error returned by the [`Receiver::recv_timeout`] and [`Receiver::recv_ref_timeout`] methods +/// (blocking only). /// -/// [`Receiver::recv`]: super::Receiver::recv -/// [`Receiver::recv_ref`]: super::Receiver::recv_ref +/// [`Receiver::recv_timeout`]: super::blocking::Receiver::recv_timeout +/// [`Receiver::recv_ref_timeout`]: super::blocking::Receiver::recv_ref_timeout +#[cfg(feature = "std")] +#[non_exhaustive] +#[derive(Debug, PartialEq, Eq)] +pub enum RecvTimeoutError { + /// The timeout elapsed before data could be received. + Timeout, + /// The channel is closed. + Closed, +} + +/// Error returned by the [`Receiver::try_recv`] and [`Receiver::try_recv_ref`] methods. +/// +/// [`Receiver::try_recv`]: super::Receiver::try_recv +/// [`Receiver::try_recv_ref`]: super::Receiver::try_recv_ref #[non_exhaustive] #[derive(Debug, PartialEq, Eq)] pub enum TryRecvError { @@ -138,6 +153,21 @@ impl fmt::Display for TrySendError { #[cfg(feature = "std")] impl std::error::Error for TrySendError {} +// === impl RecvTimeoutError === + +#[cfg(feature = "std")] +impl std::error::Error for RecvTimeoutError {} + +#[cfg(feature = "std")] +impl fmt::Display for RecvTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + Self::Timeout => "timed out waiting on channel", + Self::Closed => "channel closed", + }) + } +} + // == impl TryRecvError == #[cfg(feature = "std")] diff --git a/src/thingbuf.rs b/src/thingbuf.rs index b38e3db..82f61be 100644 --- a/src/thingbuf.rs +++ b/src/thingbuf.rs @@ -387,7 +387,7 @@ where /// [`push`]: Self::push_ref pub fn push_ref(&self) -> Result, Full> { self.core - .push_ref(&*self.slots, &self.recycle) + .push_ref(&self.slots, &self.recycle) .map_err(|e| match e { crate::mpsc::errors::TrySendError::Full(()) => Full(()), _ => unreachable!(), @@ -459,7 +459,7 @@ where /// [`push`]: Self::push /// [`pop`]: Self::pop pub fn pop_ref(&self) -> Option> { - self.core.pop_ref(&*self.slots).ok() + self.core.pop_ref(&self.slots).ok() } /// Dequeue the first element in the queue *by value*, moving it out of the diff --git a/src/wait/queue.rs b/src/wait/queue.rs index dd302ba..f820a70 100644 --- a/src/wait/queue.rs +++ b/src/wait/queue.rs @@ -626,7 +626,7 @@ impl List { } fn is_empty(&self) -> bool { - self.head == None && self.tail == None + self.head.is_none() && self.tail.is_none() } } @@ -658,7 +658,7 @@ mod tests { self.0.store(true, Ordering::SeqCst); } - fn same(&self, &Self(ref other): &Self) -> bool { + fn same(&self, Self(other): &Self) -> bool { Arc::ptr_eq(&self.0, other) } }