Skip to content

Commit

Permalink
feat(mpsc): make errors more like other mpscs (#5)
Browse files Browse the repository at this point in the history
Errors for `TrySend` now return values.

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Dec 2, 2021
1 parent 38cbad2 commit 5e749cc
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 36 deletions.
19 changes: 15 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ pub struct Ref<'slot, T> {
new_state: usize,
}

#[derive(Debug)]
pub struct AtCapacity(pub(crate) usize);
pub struct Full<T = ()>(T);

#[derive(Debug)]
struct Core {
Expand Down Expand Up @@ -110,7 +109,7 @@ impl Core {
self.capacity
}

fn push_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, AtCapacity>
fn push_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, Full<()>>
where
T: Default,
S: Index<usize, Output = Slot<T>> + ?Sized,
Expand Down Expand Up @@ -164,7 +163,7 @@ impl Core {

if state.wrapping_add(self.gen) == tail + 1 {
if self.head.load(Ordering::SeqCst).wrapping_add(self.gen) == tail {
return Err(AtCapacity(self.capacity()));
return Err(Full(()));
}

backoff.spin();
Expand Down Expand Up @@ -341,3 +340,15 @@ impl<T> Slot<T> {
}

unsafe impl<T: Sync> Sync for Slot<T> {}

impl<T> fmt::Debug for Full<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Full(..)")
}
}

impl<T> fmt::Display for Full<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("channel full")
}
}
16 changes: 11 additions & 5 deletions src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@ feature! {

#[derive(Debug)]
#[non_exhaustive]
pub enum TrySendError {
AtCapacity(crate::AtCapacity),
Closed(Closed),
pub enum TrySendError<T = ()> {
Full(T),
Closed(T),
}

#[derive(Debug)]
pub struct Closed(pub(crate) ());
impl TrySendError {
fn with_value<T>(self, value: T) -> TrySendError<T> {
match self {
Self::Full(()) => TrySendError::Full(value),
Self::Closed(()) => TrySendError::Closed(value),
}
}
}

#[cfg(test)]
mod tests;
19 changes: 11 additions & 8 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,24 @@ impl<T: Default> Sender<T> {
inner: &*self.inner,
slot,
})
.map_err(|e| {
.map_err(|_| {
if self.inner.rx_wait.is_rx_closed() {
TrySendError::Closed(Closed(()))
TrySendError::Closed(())
} else {
self.inner.rx_wait.notify();
TrySendError::AtCapacity(e)
TrySendError::Full(())
}
})
}

pub fn try_send(&self, val: T) -> Result<(), TrySendError> {
self.try_send_ref()?.with_mut(|slot| {
*slot = val;
});
Ok(())
pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
match self.try_send_ref() {
Ok(mut slot) => {
slot.with_mut(|slot| *slot = val);
Ok(())
}
Err(e) => Err(e.with_value(val)),
}
}
}

Expand Down
21 changes: 12 additions & 9 deletions src/mpsc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! This provides an equivalent API to the [`mpsc`](crate::mpsc) module, but the
//! [`Receiver`] type in this module waits by blocking the current thread,
//! rather than asynchronously yielding.
use super::{Closed, TrySendError};
use super::TrySendError;
use crate::{
loom::{
self,
Expand Down Expand Up @@ -63,21 +63,24 @@ impl<T: Default> Sender<T> {
inner: &*self.inner,
slot,
})
.map_err(|e| {
.map_err(|_| {
if self.inner.rx_wait.is_rx_closed() {
TrySendError::Closed(Closed(()))
TrySendError::Closed(())
} else {
self.inner.rx_wait.notify();
TrySendError::AtCapacity(e)
TrySendError::Full(())
}
})
}

pub fn try_send(&self, val: T) -> Result<(), TrySendError> {
self.try_send_ref()?.with_mut(|slot| {
*slot = val;
});
Ok(())
pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
match self.try_send_ref() {
Ok(mut slot) => {
slot.with_mut(|slot| *slot = val);
Ok(())
}
Err(e) => Err(e.with_value(val)),
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/mpsc/tests/mpsc_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn rx_closes() {
'send: loop {
match tx.try_send(i) {
Ok(_) => break 'send,
Err(TrySendError::AtCapacity(_)) => thread::yield_now(),
Err(TrySendError::Full(_)) => thread::yield_now(),
Err(TrySendError::Closed(_)) => break 'iters,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/mpsc/tests/mpsc_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn rx_closes() {
'send: loop {
match tx.try_send(i) {
Ok(_) => break 'send,
Err(TrySendError::AtCapacity(_)) => thread::yield_now(),
Err(TrySendError::Full(_)) => thread::yield_now(),
Err(TrySendError::Closed(_)) => break 'iters,
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/static_thingbuf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::loom::atomic::Ordering;
use crate::{AtCapacity, Core, Ref, Slot};
use crate::{Core, Full, Ref, Slot};
use core::{fmt, ptr};

pub struct StaticThingBuf<T, const CAP: usize> {
Expand Down Expand Up @@ -39,12 +39,12 @@ impl<T, const CAP: usize> StaticThingBuf<T, CAP> {
}

impl<T: Default, const CAP: usize> StaticThingBuf<T, CAP> {
pub fn push_ref(&self) -> Result<Ref<'_, T>, AtCapacity> {
pub fn push_ref(&self) -> Result<Ref<'_, T>, Full> {
self.core.push_ref(&self.slots)
}

#[inline]
pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, AtCapacity> {
pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, Full> {
self.push_ref().map(|mut r| r.with_mut(f))
}

Expand Down
4 changes: 2 additions & 2 deletions src/stringbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl StringBuf {
}

#[inline]
pub fn write(&self) -> Result<Ref<'_, String>, AtCapacity> {
pub fn write(&self) -> Result<Ref<'_, String>, Full> {
let mut string = self.inner.push_ref()?;
string.with_mut(String::clear);
Ok(string)
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<const CAP: usize> StaticStringBuf<CAP> {
}

#[inline]
pub fn write(&self) -> Result<Ref<'_, String>, AtCapacity> {
pub fn write(&self) -> Result<Ref<'_, String>, Full> {
let mut string = self.inner.push_ref()?;
string.with_mut(String::clear);
Ok(string)
Expand Down
6 changes: 3 additions & 3 deletions src/thingbuf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::loom::atomic::Ordering;
use crate::{AtCapacity, Core, Ref, Slot};
use crate::{Core, Full, Ref, Slot};
use alloc::boxed::Box;
use core::{fmt, ptr};

Expand All @@ -23,12 +23,12 @@ impl<T: Default> ThingBuf<T> {
}
}

pub fn push_ref(&self) -> Result<Ref<'_, T>, AtCapacity> {
pub fn push_ref(&self) -> Result<Ref<'_, T>, Full> {
self.core.push_ref(&*self.slots)
}

#[inline]
pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, AtCapacity> {
pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, Full> {
self.push_ref().map(|mut r| r.with_mut(f))
}

Expand Down

0 comments on commit 5e749cc

Please sign in to comment.