Skip to content

Commit

Permalink
feat(mpsc): stick errors in their own module
Browse files Browse the repository at this point in the history
Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw committed Mar 15, 2022
1 parent d5ac083 commit 3137b85
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 114 deletions.
4 changes: 2 additions & 2 deletions bench/benches/async_spsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ aaaaaaaaaaaaaa";
group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
let rt = runtime::Builder::new_current_thread().build().unwrap();
b.to_async(rt).iter(|| async {
use thingbuf::mpsc::{self, TrySendError};
use thingbuf::mpsc::{self, errors::TrySendError};

let (tx, rx) = mpsc::channel::<String>(100);
task::spawn(async move {
Expand Down Expand Up @@ -265,7 +265,7 @@ fn bench_spsc_try_send_integer(c: &mut Criterion) {
group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
let rt = runtime::Builder::new_current_thread().build().unwrap();
b.to_async(rt).iter(|| async {
use thingbuf::mpsc::{self, TrySendError};
use thingbuf::mpsc::{self, errors::TrySendError};
let (tx, rx) = mpsc::channel(100);
task::spawn(async move {
let mut i = 0;
Expand Down
2 changes: 1 addition & 1 deletion bench/benches/sync_spsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ aaaaaaaaaaaaaa";
group.throughput(Throughput::Elements(size));
group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
b.iter(|| {
use thingbuf::mpsc::{blocking, TrySendError};
use thingbuf::mpsc::{blocking, errors::TrySendError};
let (tx, rx) = blocking::channel::<String>(100);
let producer = thread::spawn(move || loop {
match tx.try_send_ref() {
Expand Down
15 changes: 8 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
atomic::{AtomicUsize, Ordering::*},
cell::{MutPtr, UnsafeCell},
},
mpsc::errors::TrySendError,
util::{Backoff, CachePadded},
};

Expand Down Expand Up @@ -172,7 +173,7 @@ impl Core {
&self,
slots: &'slots S,
recycle: &R,
) -> Result<Ref<'slots, T>, mpsc::TrySendError<()>>
) -> Result<Ref<'slots, T>, TrySendError<()>>
where
R: Recycle<T>,
S: ops::Index<usize, Output = Slot<T>> + ?Sized,
Expand All @@ -183,7 +184,7 @@ impl Core {

loop {
if test_dbg!(tail & self.closed != 0) {
return Err(mpsc::TrySendError::Closed(()));
return Err(TrySendError::Closed(()));
}
let (idx, gen) = self.idx_gen(tail);
test_dbg!(idx);
Expand Down Expand Up @@ -247,7 +248,7 @@ impl Core {
let head = test_dbg!(self.head.fetch_or(0, SeqCst));
if test_dbg!(head.wrapping_add(self.gen) == tail) {
test_println!("channel full");
return Err(mpsc::TrySendError::Full(()));
return Err(TrySendError::Full(()));
}

backoff.spin();
Expand All @@ -260,7 +261,7 @@ impl Core {
}

#[inline(always)]
fn pop_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, mpsc::TrySendError>
fn pop_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, TrySendError>
where
S: ops::Index<usize, Output = Slot<T>> + ?Sized,
{
Expand Down Expand Up @@ -313,15 +314,15 @@ impl Core {

if test_dbg!(tail & !self.closed == head) {
return if test_dbg!(tail & self.closed != 0) {
Err(mpsc::TrySendError::Closed(()))
Err(TrySendError::Closed(()))
} else {
test_println!("--> channel full!");
Err(mpsc::TrySendError::Full(()))
Err(TrySendError::Full(()))
};
}

if test_dbg!(backoff.done_spinning()) {
return Err(mpsc::TrySendError::Full(()));
return Err(TrySendError::Full(()));
}

backoff.spin();
Expand Down
104 changes: 2 additions & 102 deletions src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,8 @@ use crate::{
};
use core::{fmt, ops::Index, task::Poll};

/// Error returned by the [`Sender::try_send`] (and [`StaticSender::try_send`])
/// methods.
#[non_exhaustive]
pub enum TrySendError<T = ()> {
/// The data could not be sent on the channel because the channel is
/// currently full and sending would require waiting for capacity.
Full(T),
/// The data could not be sent because the [`Receiver`] half of the channel
/// has been dropped.
Closed(T),
}

/// Error returned by [`Sender::send`] and [`Sender::send_ref`], if the
/// [`Receiver`] half of the channel has been dropped.
pub struct Closed<T = ()>(T);
pub mod errors;
use self::errors::TrySendError;

#[derive(Debug)]
struct ChannelCore<N> {
Expand Down Expand Up @@ -65,93 +52,6 @@ struct SendRefInner<'a, T, N: Notify> {
struct NotifyRx<'a, N: Notify>(&'a WaitCell<N>);
struct NotifyTx<'a, N: Notify + Unpin>(&'a WaitQueue<N>);

// === impl Closed ===

impl<T> Closed<T> {
/// Unwraps the inner `T` value held by this error.
///
/// This method allows recovering the original message when sending to a
/// channel has failed.
pub fn into_inner(self) -> T {
self.0
}
}

impl<T> fmt::Debug for Closed<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Closed(..)")
}
}

impl<T> fmt::Display for Closed<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("channel closed")
}
}

#[cfg(feature = "std")]
impl<T> std::error::Error for Closed<T> {}

// === impl TrySendError ===

impl TrySendError {
fn with_value<T>(self, value: T) -> TrySendError<T> {
match self {
Self::Full(()) => TrySendError::Full(value),
Self::Closed(()) => TrySendError::Closed(value),
}
}
}

impl<T> TrySendError<T> {
/// Returns `true` if this error was returned because the channel was at
/// capacity.
pub fn is_full(&self) -> bool {
matches!(self, Self::Full(_))
}

/// Returns `true` if this error was returned because the channel has closed
/// (e.g. the `Receiver` end has been dropped).
///
/// If this returns `true`, no future `try_send` or `send` operation on this
/// channel will succeed.
pub fn is_closed(&self) -> bool {
matches!(self, Self::Full(_))
}

/// Unwraps the inner `T` value held by this error.
///
/// This method allows recovering the original message when sending to a
/// channel has failed.
pub fn into_inner(self) -> T {
match self {
Self::Full(val) => val,
Self::Closed(val) => val,
}
}
}

impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::Full(_) => "TrySendError::Full(..)",
Self::Closed(_) => "TrySendError::Closed(..)",
})
}
}

impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::Full(_) => "no available capacity",
Self::Closed(_) => "channel closed",
})
}
}

#[cfg(feature = "std")]
impl<T> std::error::Error for TrySendError<T> {}

// ==== impl Inner ====

impl<N> ChannelCore<N> {
Expand Down
1 change: 1 addition & 0 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use core::{
pin::Pin,
task::{Context, Poll, Waker},
};
use errors::*;

feature! {
#![feature = "alloc"]
Expand Down
1 change: 1 addition & 0 deletions src/mpsc/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
Ref,
};
use core::{fmt, pin::Pin};
use errors::*;

/// Returns a new synchronous multi-producer, single consumer (MPSC)
/// channel with the provided capacity.
Expand Down
124 changes: 124 additions & 0 deletions src/mpsc/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//! Errors returned by channels.
use core::fmt;

/// Error returned by the [`Sender::try_send`] or [`Sender::try_send_ref`] (and
/// [`StaticSender::try_send`]/[`StaticSender::try_send_ref`]) methods.
///
/// [`Sender::try_send`]: super::Sender::try_send
/// [`Sender::try_send_ref`]: super::Sender::try_send_ref
/// [`StaticSender::try_send`]: super::StaticSender::try_send
/// [`StaticSender::try_send_ref`]: super::StaticSender::try_send_ref
#[non_exhaustive]
pub enum TrySendError<T = ()> {
/// The data could not be sent on the channel because the channel is
/// currently full and sending would require waiting for capacity.
Full(T),
/// The data could not be sent because the [`Receiver`] half of the channel
/// has been dropped.
///
/// [`Receiver`]: super::Receiver
Closed(T),
}

/// Error returned by [`Sender::send`] or [`Sender::send_ref`] (and
/// [`StaticSender::send`]/[`StaticSender::send_ref`]), if the
/// [`Receiver`] half of the channel has been dropped.
///
/// [`Sender::send`]: super::Sender::send
/// [`Sender::send_ref`]: super::Sender::send_ref
/// [`StaticSender::send`]: super::StaticSender::send
/// [`StaticSender::send_ref`]: super::StaticSender::send_ref
/// [`Receiver`]: super::Receiver
pub struct Closed<T = ()>(pub(crate) T);

// === impl Closed ===

impl<T> Closed<T> {
/// Unwraps the inner `T` value held by this error.
///
/// This method allows recovering the original message when sending to a
/// channel has failed.
pub fn into_inner(self) -> T {
self.0
}
}

impl<T> fmt::Debug for Closed<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Closed(..)")
}
}

impl<T> fmt::Display for Closed<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("channel closed")
}
}

#[cfg(feature = "std")]
impl<T> std::error::Error for Closed<T> {}

// === impl TrySendError ===

impl TrySendError {
pub(crate) fn with_value<T>(self, value: T) -> TrySendError<T> {
match self {
Self::Full(()) => TrySendError::Full(value),
Self::Closed(()) => TrySendError::Closed(value),
}
}
}

impl<T> TrySendError<T> {
/// Returns `true` if this error was returned because the channel was at
/// capacity.
pub fn is_full(&self) -> bool {
matches!(self, Self::Full(_))
}

/// Returns `true` if this error was returned because the channel has closed
/// (e.g. the [`Receiver`] end has been dropped).
///
/// If this returns `true`, no future [`try_send`] or [`send`] operation on
/// this channel will succeed.
///
/// [`Receiver`]: super::Receiver
/// [`try_send`]: super::Sender::try_send
/// [`send`]: super::Sender::send
/// [`Receiver`]: super::Receiver
pub fn is_closed(&self) -> bool {
matches!(self, Self::Full(_))
}

/// Unwraps the inner `T` value held by this error.
///
/// This method allows recovering the original message when sending to a
/// channel has failed.
pub fn into_inner(self) -> T {
match self {
Self::Full(val) => val,
Self::Closed(val) => val,
}
}
}

impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::Full(_) => "TrySendError::Full(..)",
Self::Closed(_) => "TrySendError::Closed(..)",
})
}
}

impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::Full(_) => "no available capacity",
Self::Closed(_) => "channel closed",
})
}
}

#[cfg(feature = "std")]
impl<T> std::error::Error for TrySendError<T> {}
2 changes: 1 addition & 1 deletion src/static_thingbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ where
self.core
.push_ref(&self.slots, &self.recycle)
.map_err(|e| match e {
crate::mpsc::TrySendError::Full(()) => Full(()),
crate::mpsc::errors::TrySendError::Full(()) => Full(()),
_ => unreachable!(),
})
}
Expand Down
2 changes: 1 addition & 1 deletion src/thingbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ where
self.core
.push_ref(&*self.slots, &self.recycle)
.map_err(|e| match e {
crate::mpsc::TrySendError::Full(()) => Full(()),
crate::mpsc::errors::TrySendError::Full(()) => Full(()),
_ => unreachable!(),
})
}
Expand Down

0 comments on commit 3137b85

Please sign in to comment.