From dc6b8c3d43ce3c8ed5878d39e1f76779a857bbe7 Mon Sep 17 00:00:00 2001 From: Utkarsh Gupta Date: Thu, 27 Apr 2023 12:17:13 +0530 Subject: [PATCH 1/5] feat(blocking::mpsc): add `Sender::send(_ref)_timeout` methods --- README.md | 6 +- src/mpsc/blocking.rs | 298 +++++++++++++++++++++++++++++++++++++++++++ src/mpsc/errors.rs | 86 +++++++++++++ 3 files changed, 386 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 3168db8..ed140b5 100644 --- a/README.md +++ b/README.md @@ -73,10 +73,8 @@ some cases where you might be better off considering other options: prefer a channel implementation that only allocates memory for messages as it's needed (such as [`tokio::sync::mpsc`]). -- **You need a blocking channel with `send_timeout`** or **a blocking channel - with a `select` operation**. I'm probably not going to implement these things. - The blocking channel isn't particularly important to me compared to the async - channel, and I _probably_ won't add a bunch of additional APIs to it. +- **You need a blocking channel with a `select` operation**. + I'm probably not going to implement it. I _may_ accept a PR if you raise it. If you need a synchronous channel with this kind of functionality, [`crossbeam-channel`] is probably a good choice. diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index 4abffc3..d46f732 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -351,6 +351,124 @@ feature! { } } + /// Reserves a slot in the channel to mutate in place, blocking until + /// there is a free slot to write to, waiting for at most `timeout`. + /// + /// This is similar to the [`send`] method, but, rather than taking a + /// message by value to write to the channel, this method reserves a + /// writable slot in the channel, and returns a [`SendRef`] that allows + /// mutating the slot in place. If the [`StaticReceiver`] end of the + /// channel uses the [`StaticReceiver::recv_ref`] method for receiving + /// from the channel, this allows allocations for channel messages to be + /// reused in place. + /// + /// # Errors + /// + /// - [`Err`]`(`[`SendTimeoutError::Timeout`]`)` if the timeout has elapsed. + /// - [`Err`]`(`[`SendTimeoutError::Closed`]`)` if the channel has closed. + /// + /// # Examples + /// + /// Sending formatted strings by writing them directly to channel slots, + /// in place: + /// + /// ``` + /// use thingbuf::mpsc::{blocking::StaticChannel, errors::SendTimeoutError}; + /// use std::{fmt::Write, time::Duration, thread}; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// let (tx, rx) = CHANNEL.split(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(500)); + /// let msg = rx.recv_ref().unwrap(); + /// println!("{}", msg); + /// thread::sleep(Duration::from_millis(500)); + /// }); + /// + /// thread::spawn(move || { + /// let mut value = tx.send_ref_timeout(Duration::from_millis(200)).unwrap(); + /// write!(value, "hello").expect("writing to a `String` should never fail"); + /// thread::sleep(Duration::from_millis(400)); + /// + /// let mut value = tx.send_ref_timeout(Duration::from_millis(200)).unwrap(); + /// write!(value, "world").expect("writing to a `String` should never fail"); + /// thread::sleep(Duration::from_millis(400)); + /// + /// assert_eq!( + /// Err(&SendTimeoutError::Timeout(())), + /// tx.send_ref_timeout(Duration::from_millis(200)).as_deref().map(String::as_str) + /// ); + /// }); + /// ``` + /// + /// [`send`]: Self::send + #[cfg(not(all(test, loom)))] + pub fn send_ref_timeout(&self, timeout: Duration) -> Result, SendTimeoutError> { + send_ref_timeout(self.core, self.slots, self.recycle, timeout) + } + + /// Sends a message by value, blocking until there is a free slot to + /// write to, waiting for at most `timeout`. + /// + /// This method takes the message by value, and replaces any previous + /// value in the slot. This means that the channel will *not* function + /// as an object pool while sending messages with `send`. This method is + /// most appropriate when messages don't own reusable heap allocations, + /// or when the [`StaticReceiver`] end of the channel must receive messages + /// by moving them out of the channel by value (using the + /// [`StaticReceiver::recv`] method). When messages in the channel own + /// reusable heap allocations (such as `String`s or `Vec`s), and the + /// [`StaticReceiver`] doesn't need to receive them by value, consider using + /// [`send_ref`] instead, to enable allocation reuse. + /// + /// # Errors + /// + /// - [`Err`]`(`[`SendTimeoutError::Timeout`]`)` if the timeout has elapsed. + /// - [`Err`]`(`[`SendTimeoutError::Closed`]`)` if the channel has closed. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{blocking::StaticChannel, errors::SendTimeoutError}; + /// use std::{time::Duration, thread}; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// let (tx, rx) = CHANNEL.split(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(500)); + /// let msg = rx.recv().unwrap(); + /// println!("{}", msg); + /// thread::sleep(Duration::from_millis(500)); + /// }); + /// + /// thread::spawn(move || { + /// tx.send_timeout(1, Duration::from_millis(200)).unwrap(); + /// thread::sleep(Duration::from_millis(400)); + /// + /// tx.send_timeout(2, Duration::from_millis(200)).unwrap(); + /// thread::sleep(Duration::from_millis(400)); + /// + /// assert_eq!( + /// Err(SendTimeoutError::Timeout(3)), + /// tx.send_timeout(3, Duration::from_millis(200)) + /// ); + /// }); + /// ``` + /// + /// [`send_ref`]: Self::send_ref + #[cfg(not(all(test, loom)))] + pub fn send_timeout(&self, val: T, timeout: Duration) -> Result<(), SendTimeoutError> { + match self.send_ref_timeout(timeout) { + Err(e) => Err(e.with_value(val)), + Ok(mut slot) => { + *slot = val; + Ok(()) + } + } + } + /// Attempts to reserve a slot in the channel to mutate in place, /// without blocking until capacity is available. /// @@ -586,6 +704,7 @@ feature! { /// - [`Ok`]`(`[`RecvRef`]`)` if a message was received. /// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed. /// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed. + /// /// # Examples /// /// ``` @@ -643,6 +762,7 @@ feature! { /// - [`Ok`]`()` if a message was received. /// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed. /// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed. + /// /// # Examples /// /// ``` @@ -927,6 +1047,126 @@ where } } + /// Reserves a slot in the channel to mutate in place, blocking until + /// there is a free slot to write to, waiting for at most `timeout`. + /// + /// This is similar to the [`send`] method, but, rather than taking a + /// message by value to write to the channel, this method reserves a + /// writable slot in the channel, and returns a [`SendRef`] that allows + /// mutating the slot in place. If the [`Receiver`] end of the channel + /// uses the [`Receiver::recv_ref`] method for receiving from the channel, + /// this allows allocations for channel messages to be reused in place. + /// + /// # Errors + /// + /// - [`Err`]`(`[`SendTimeoutError::Timeout`]`)` if the timeout has elapsed. + /// - [`Err`]`(`[`SendTimeoutError::Closed`]`)` if the channel has closed. + /// + /// # Examples + /// + /// Sending formatted strings by writing them directly to channel slots, + /// in place: + /// + /// ``` + /// use thingbuf::mpsc::{blocking, errors::SendTimeoutError}; + /// use std::{fmt::Write, time::Duration, thread}; + /// + /// let (tx, rx) = blocking::channel::(1); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(500)); + /// let msg = rx.recv_ref().unwrap(); + /// println!("{}", msg); + /// thread::sleep(Duration::from_millis(500)); + /// }); + /// + /// thread::spawn(move || { + /// let mut value = tx.send_ref_timeout(Duration::from_millis(200)).unwrap(); + /// write!(value, "hello").expect("writing to a `String` should never fail"); + /// thread::sleep(Duration::from_millis(400)); + /// + /// let mut value = tx.send_ref_timeout(Duration::from_millis(200)).unwrap(); + /// write!(value, "world").expect("writing to a `String` should never fail"); + /// thread::sleep(Duration::from_millis(400)); + /// + /// assert_eq!( + /// Err(&SendTimeoutError::Timeout(())), + /// tx.send_ref_timeout(Duration::from_millis(200)).as_deref().map(String::as_str) + /// ); + /// }); + /// ``` + /// + /// [`send`]: Self::send + #[cfg(not(all(test, loom)))] + pub fn send_ref_timeout(&self, timeout: Duration) -> Result, SendTimeoutError> { + send_ref_timeout( + &self.inner.core, + self.inner.slots.as_ref(), + &self.inner.recycle, + timeout, + ) + } + + /// Sends a message by value, blocking until there is a free slot to + /// write to. + /// + /// This method takes the message by value, and replaces any previous + /// value in the slot. This means that the channel will *not* function + /// as an object pool while sending messages with `send`. This method is + /// most appropriate when messages don't own reusable heap allocations, + /// or when the [`Receiver`] end of the channel must receive messages by + /// moving them out of the channel by value (using the + /// [`Receiver::recv`] method). When messages in the channel own + /// reusable heap allocations (such as `String`s or `Vec`s), and the + /// [`Receiver`] doesn't need to receive them by value, consider using + /// [`send_ref`] instead, to enable allocation reuse. + /// + /// # Errors + /// + /// If the [`Receiver`] end of the channel has been dropped, this + /// returns a [`Closed`] error containing the sent value. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{blocking, errors::SendTimeoutError}; + /// use std::{time::Duration, thread}; + /// + /// let (tx, rx) = blocking::channel(1); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(500)); + /// let msg = rx.recv().unwrap(); + /// println!("{}", msg); + /// thread::sleep(Duration::from_millis(500)); + /// }); + /// + /// thread::spawn(move || { + /// tx.send_timeout(1, Duration::from_millis(200)).unwrap(); + /// thread::sleep(Duration::from_millis(400)); + /// + /// tx.send_timeout(2, Duration::from_millis(200)).unwrap(); + /// thread::sleep(Duration::from_millis(400)); + /// + /// assert_eq!( + /// Err(SendTimeoutError::Timeout(3)), + /// tx.send_timeout(3, Duration::from_millis(200)) + /// ); + /// }); + /// ``` + /// + /// [`send_ref`]: Self::send_ref + #[cfg(not(all(test, loom)))] + pub fn send_timeout(&self, val: T, timeout: Duration) -> Result<(), SendTimeoutError> { + match self.send_ref_timeout(timeout) { + Err(e) => Err(e.with_value(val)), + Ok(mut slot) => { + *slot = val; + Ok(()) + } + } + } + /// Attempts to reserve a slot in the channel to mutate in place, /// without blocking until capacity is available. /// @@ -1149,6 +1389,7 @@ impl Receiver { /// - [`Ok`]`(`[`RecvRef`]`)` if a message was received. /// - [`Err`]`(`[`RecvTimeoutError::Timeout`]`)` if the timeout has elapsed. /// - [`Err`]`(`[`RecvTimeoutError::Closed`]`)` if the channel has closed. + /// /// # Examples /// /// ``` @@ -1454,3 +1695,60 @@ fn send_ref<'a, T, R: Recycle>( } } } + +#[cfg(not(all(test, loom)))] +#[inline] +fn send_ref_timeout<'a, T, R: Recycle>( + core: &'a ChannelCore, + slots: &'a [Slot], + recycle: &'a R, + timeout: Duration, +) -> Result, SendTimeoutError> { + // fast path: avoid getting the thread and constructing the node if the + // slot is immediately ready. + match core.try_send_ref(slots, recycle) { + Ok(slot) => return Ok(SendRef(slot)), + Err(TrySendError::Closed(_)) => return Err(SendTimeoutError::Closed(())), + _ => {} + } + + let mut waiter = queue::Waiter::new(); + let mut unqueued = true; + let thread = thread::current(); + let mut boff = Backoff::new(); + let beginning_park = Instant::now(); + loop { + let node = unsafe { + // Safety: in this case, it's totally safe to pin the waiter, as + // it is owned uniquely by this function, and it cannot possibly + // be moved while this thread is parked. + Pin::new_unchecked(&mut waiter) + }; + + let wait = if unqueued { + test_dbg!(core.tx_wait.start_wait(node, &thread)) + } else { + test_dbg!(core.tx_wait.continue_wait(node, &thread)) + }; + + match wait { + WaitResult::Closed => return Err(SendTimeoutError::Closed(())), + WaitResult::Notified => { + boff.spin_yield(); + match core.try_send_ref(slots.as_ref(), recycle) { + Ok(slot) => return Ok(SendRef(slot)), + Err(TrySendError::Closed(_)) => return Err(SendTimeoutError::Closed(())), + _ => {} + } + } + WaitResult::Wait => { + unqueued = false; + thread::park_timeout(timeout); + let elapsed = beginning_park.elapsed(); + if elapsed >= timeout { + return Err(SendTimeoutError::Timeout(())); + } + } + } + } +} diff --git a/src/mpsc/errors.rs b/src/mpsc/errors.rs index c5d2139..d891b38 100644 --- a/src/mpsc/errors.rs +++ b/src/mpsc/errors.rs @@ -1,6 +1,26 @@ //! Errors returned by channels. use core::fmt; +/// Error returned by the [`Sender::try_send`] or [`Sender::try_send_ref`] (and +/// [`StaticSender::try_send`]/[`StaticSender::try_send_ref`]) methods. +/// +/// [`Sender::try_send`]: super::Sender::try_send +/// [`Sender::try_send_ref`]: super::Sender::try_send_ref +/// [`StaticSender::try_send`]: super::StaticSender::try_send +/// [`StaticSender::try_send_ref`]: super::StaticSender::try_send_ref +#[non_exhaustive] +#[derive(PartialEq, Eq)] +pub enum SendTimeoutError { + /// The data could not be sent on the channel because the channel is + /// currently full and sending would require waiting for capacity. + Timeout(T), + /// The data could not be sent because the [`Receiver`] half of the channel + /// has been dropped. + /// + /// [`Receiver`]: super::Receiver + Closed(T), +} + /// Error returned by the [`Sender::try_send`] or [`Sender::try_send_ref`] (and /// [`StaticSender::try_send`]/[`StaticSender::try_send_ref`]) methods. /// @@ -88,6 +108,72 @@ impl fmt::Display for Closed { #[cfg(feature = "std")] impl std::error::Error for Closed {} +// === impl SendTimeoutError === + +impl SendTimeoutError { + #[cfg(feature = "std")] + pub(crate) fn with_value(self, value: T) -> SendTimeoutError { + match self { + Self::Timeout(()) => SendTimeoutError::Timeout(value), + Self::Closed(()) => SendTimeoutError::Closed(value), + } + } +} + +impl SendTimeoutError { + /// Returns `true` if this error was returned because the channel is still + /// full after the timeout has elapsed. + pub fn is_timeout(&self) -> bool { + matches!(self, Self::Timeout(_)) + } + + /// Returns `true` if this error was returned because the channel has closed + /// (e.g. the [`Receiver`] end has been dropped). + /// + /// If this returns `true`, no future [`try_send`] or [`send`] operation on + /// this channel will succeed. + /// + /// [`Receiver`]: super::Receiver + /// [`try_send`]: super::Sender::try_send + /// [`send`]: super::Sender::send + /// [`Receiver`]: super::Receiver + pub fn is_closed(&self) -> bool { + matches!(self, Self::Timeout(_)) + } + + /// Unwraps the inner `T` value held by this error. + /// + /// This method allows recovering the original message when sending to a + /// channel has failed. + pub fn into_inner(self) -> T { + match self { + Self::Timeout(val) => val, + Self::Closed(val) => val, + } + } +} + +impl fmt::Debug for SendTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + Self::Timeout(_) => "SendTimeoutError ::Timeout(..)", + Self::Closed(_) => "SendTimeoutError ::Closed(..)", + }) + } +} + +impl fmt::Display for SendTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + Self::Timeout(_) => "timed out waiting for channel capacity", + Self::Closed(_) => "channel closed", + }) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for SendTimeoutError {} + // === impl TrySendError === impl TrySendError { From 02f27a0f54bbd2fa4fb9e599ec17dd25a6ca65c6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 28 Apr 2023 09:47:34 -0700 Subject: [PATCH 2/5] Apply docs suggestions from code review --- src/mpsc/blocking.rs | 25 +++++++++++++------------ src/mpsc/errors.rs | 16 ++++++++-------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index d46f732..696835e 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -354,7 +354,7 @@ feature! { /// Reserves a slot in the channel to mutate in place, blocking until /// there is a free slot to write to, waiting for at most `timeout`. /// - /// This is similar to the [`send`] method, but, rather than taking a + /// This is similar to the [`send_timeout`] method, but, rather than taking a /// message by value to write to the channel, this method reserves a /// writable slot in the channel, and returns a [`SendRef`] that allows /// mutating the slot in place. If the [`StaticReceiver`] end of the @@ -402,7 +402,7 @@ feature! { /// }); /// ``` /// - /// [`send`]: Self::send + /// [`send_timeout`]: Self::send_timeout #[cfg(not(all(test, loom)))] pub fn send_ref_timeout(&self, timeout: Duration) -> Result, SendTimeoutError> { send_ref_timeout(self.core, self.slots, self.recycle, timeout) @@ -413,14 +413,14 @@ feature! { /// /// This method takes the message by value, and replaces any previous /// value in the slot. This means that the channel will *not* function - /// as an object pool while sending messages with `send`. This method is + /// as an object pool while sending messages with `send_timeout`. This method is /// most appropriate when messages don't own reusable heap allocations, /// or when the [`StaticReceiver`] end of the channel must receive messages /// by moving them out of the channel by value (using the /// [`StaticReceiver::recv`] method). When messages in the channel own /// reusable heap allocations (such as `String`s or `Vec`s), and the /// [`StaticReceiver`] doesn't need to receive them by value, consider using - /// [`send_ref`] instead, to enable allocation reuse. + /// [`send_ref_timeout`] instead, to enable allocation reuse. /// /// # Errors /// @@ -457,7 +457,7 @@ feature! { /// }); /// ``` /// - /// [`send_ref`]: Self::send_ref + /// [`send_ref_timeout`]: Self::send_ref_timeout #[cfg(not(all(test, loom)))] pub fn send_timeout(&self, val: T, timeout: Duration) -> Result<(), SendTimeoutError> { match self.send_ref_timeout(timeout) { @@ -1050,7 +1050,7 @@ where /// Reserves a slot in the channel to mutate in place, blocking until /// there is a free slot to write to, waiting for at most `timeout`. /// - /// This is similar to the [`send`] method, but, rather than taking a + /// This is similar to the [`send_timeout`] method, but, rather than taking a /// message by value to write to the channel, this method reserves a /// writable slot in the channel, and returns a [`SendRef`] that allows /// mutating the slot in place. If the [`Receiver`] end of the channel @@ -1096,7 +1096,7 @@ where /// }); /// ``` /// - /// [`send`]: Self::send + /// [`send_timeout`]: Self::send_timeout #[cfg(not(all(test, loom)))] pub fn send_ref_timeout(&self, timeout: Duration) -> Result, SendTimeoutError> { send_ref_timeout( @@ -1108,23 +1108,24 @@ where } /// Sends a message by value, blocking until there is a free slot to - /// write to. + /// write to, for at most `timeout`. /// /// This method takes the message by value, and replaces any previous /// value in the slot. This means that the channel will *not* function - /// as an object pool while sending messages with `send`. This method is + /// as an object pool while sending messages with `send_timeout`. This method is /// most appropriate when messages don't own reusable heap allocations, /// or when the [`Receiver`] end of the channel must receive messages by /// moving them out of the channel by value (using the /// [`Receiver::recv`] method). When messages in the channel own /// reusable heap allocations (such as `String`s or `Vec`s), and the /// [`Receiver`] doesn't need to receive them by value, consider using - /// [`send_ref`] instead, to enable allocation reuse. + /// [`send_ref_timeout`] instead, to enable allocation reuse. + /// /// /// # Errors /// - /// If the [`Receiver`] end of the channel has been dropped, this - /// returns a [`Closed`] error containing the sent value. + /// - [`Err`]`(`[`SendTimeoutError::Timeout`]`)` if the timeout has elapsed. + /// - [`Err`]`(`[`SendTimeoutError::Closed`]`)` if the channel has closed. /// /// # Examples /// diff --git a/src/mpsc/errors.rs b/src/mpsc/errors.rs index d891b38..c4d4c25 100644 --- a/src/mpsc/errors.rs +++ b/src/mpsc/errors.rs @@ -1,13 +1,13 @@ //! Errors returned by channels. use core::fmt; -/// Error returned by the [`Sender::try_send`] or [`Sender::try_send_ref`] (and -/// [`StaticSender::try_send`]/[`StaticSender::try_send_ref`]) methods. +/// Error returned by the [`Sender::send_timeout`] or [`Sender::send_ref_timeout`] +/// (and [`StaticSender::send_timeout`]/[`StaticSender::send_ref_timeout`]) methods. /// -/// [`Sender::try_send`]: super::Sender::try_send -/// [`Sender::try_send_ref`]: super::Sender::try_send_ref -/// [`StaticSender::try_send`]: super::StaticSender::try_send -/// [`StaticSender::try_send_ref`]: super::StaticSender::try_send_ref +/// [`Sender::send_timeout`]: super::Sender::send_timeout +/// [`Sender::send_ref_timeout`]: super::Sender::send_ref_timeout +/// [`StaticSender::send_timeout`]: super::StaticSender::send_timeout +/// [`StaticSender::send_ref_timeout`]: super::StaticSender::send_ref_timeout #[non_exhaustive] #[derive(PartialEq, Eq)] pub enum SendTimeoutError { @@ -156,8 +156,8 @@ impl SendTimeoutError { impl fmt::Debug for SendTimeoutError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(match self { - Self::Timeout(_) => "SendTimeoutError ::Timeout(..)", - Self::Closed(_) => "SendTimeoutError ::Closed(..)", + Self::Timeout(_) => "SendTimeoutError::Timeout(..)", + Self::Closed(_) => "SendTimeoutError::Closed(..)", }) } } From 6f7110a627f96feae11da47d2323f2a6102a3339 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 28 Apr 2023 10:06:48 -0700 Subject: [PATCH 3/5] docs suggestions --- src/mpsc/blocking.rs | 2 +- src/mpsc/errors.rs | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index 696835e..cb6f037 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -1156,7 +1156,7 @@ where /// }); /// ``` /// - /// [`send_ref`]: Self::send_ref + /// [`send_ref_timeout`]: Self::send_ref_timeout #[cfg(not(all(test, loom)))] pub fn send_timeout(&self, val: T, timeout: Duration) -> Result<(), SendTimeoutError> { match self.send_ref_timeout(timeout) { diff --git a/src/mpsc/errors.rs b/src/mpsc/errors.rs index c4d4c25..e98458a 100644 --- a/src/mpsc/errors.rs +++ b/src/mpsc/errors.rs @@ -2,12 +2,13 @@ use core::fmt; /// Error returned by the [`Sender::send_timeout`] or [`Sender::send_ref_timeout`] -/// (and [`StaticSender::send_timeout`]/[`StaticSender::send_ref_timeout`]) methods. +/// (and [`StaticSender::send_timeout`]/[`StaticSender::send_ref_timeout`]) methods +/// (blocking only). /// -/// [`Sender::send_timeout`]: super::Sender::send_timeout -/// [`Sender::send_ref_timeout`]: super::Sender::send_ref_timeout -/// [`StaticSender::send_timeout`]: super::StaticSender::send_timeout -/// [`StaticSender::send_ref_timeout`]: super::StaticSender::send_ref_timeout +/// [`Sender::send_timeout`]: super::blocking::Sender::send_timeout +/// [`Sender::send_ref_timeout`]: super::blocking::Sender::send_ref_timeout +/// [`StaticSender::send_timeout`]: super::blocking::StaticSender::send_timeout +/// [`StaticSender::send_ref_timeout`]: super::blocking::StaticSender::send_ref_timeout #[non_exhaustive] #[derive(PartialEq, Eq)] pub enum SendTimeoutError { @@ -133,10 +134,10 @@ impl SendTimeoutError { /// If this returns `true`, no future [`try_send`] or [`send`] operation on /// this channel will succeed. /// - /// [`Receiver`]: super::Receiver - /// [`try_send`]: super::Sender::try_send - /// [`send`]: super::Sender::send - /// [`Receiver`]: super::Receiver + /// [`Receiver`]: super::blocking::Receiver + /// [`try_send`]: super::blocking::Sender::try_send + /// [`send`]: super::blocking::Sender::send + /// [`Receiver`]: super::blocking::Receiver pub fn is_closed(&self) -> bool { matches!(self, Self::Timeout(_)) } From 322d1a3b7396abf3de3eb444c0c5bbe27d509f7f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 28 Apr 2023 10:08:31 -0700 Subject: [PATCH 4/5] feature flag the whole error type --- src/mpsc/errors.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/mpsc/errors.rs b/src/mpsc/errors.rs index e98458a..af29e5a 100644 --- a/src/mpsc/errors.rs +++ b/src/mpsc/errors.rs @@ -1,7 +1,7 @@ //! Errors returned by channels. use core::fmt; -/// Error returned by the [`Sender::send_timeout`] or [`Sender::send_ref_timeout`] +/// Error returned by the [`Sender::send_timeout`] or [`Sender::send_ref_timeout`] /// (and [`StaticSender::send_timeout`]/[`StaticSender::send_ref_timeout`]) methods /// (blocking only). /// @@ -9,6 +9,7 @@ use core::fmt; /// [`Sender::send_ref_timeout`]: super::blocking::Sender::send_ref_timeout /// [`StaticSender::send_timeout`]: super::blocking::StaticSender::send_timeout /// [`StaticSender::send_ref_timeout`]: super::blocking::StaticSender::send_ref_timeout +#[cfg(feature = "std")] #[non_exhaustive] #[derive(PartialEq, Eq)] pub enum SendTimeoutError { @@ -111,17 +112,15 @@ impl std::error::Error for Closed {} // === impl SendTimeoutError === +#[cfg(feature = "std")] impl SendTimeoutError { - #[cfg(feature = "std")] pub(crate) fn with_value(self, value: T) -> SendTimeoutError { match self { Self::Timeout(()) => SendTimeoutError::Timeout(value), Self::Closed(()) => SendTimeoutError::Closed(value), } } -} -impl SendTimeoutError { /// Returns `true` if this error was returned because the channel is still /// full after the timeout has elapsed. pub fn is_timeout(&self) -> bool { @@ -154,6 +153,7 @@ impl SendTimeoutError { } } +#[cfg(feature = "std")] impl fmt::Debug for SendTimeoutError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(match self { @@ -163,6 +163,7 @@ impl fmt::Debug for SendTimeoutError { } } +#[cfg(feature = "std")] impl fmt::Display for SendTimeoutError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(match self { From 538fbe931855a776a24e74b677c9afb3f9a3f832 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 28 Apr 2023 10:19:45 -0700 Subject: [PATCH 5/5] Update src/mpsc/errors.rs --- src/mpsc/errors.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/mpsc/errors.rs b/src/mpsc/errors.rs index af29e5a..a41df34 100644 --- a/src/mpsc/errors.rs +++ b/src/mpsc/errors.rs @@ -120,7 +120,10 @@ impl SendTimeoutError { Self::Closed(()) => SendTimeoutError::Closed(value), } } +} +#[cfg(feature = "std")] +impl SendTimeoutError { /// Returns `true` if this error was returned because the channel is still /// full after the timeout has elapsed. pub fn is_timeout(&self) -> bool {