Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add nicer fmt::Debug impls #4

Merged
merged 2 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ pub fn channel<T>(thingbuf: ThingBuf<T>) -> (Sender<T>, Receiver<T>) {
(tx, rx)
}

#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Inner<T>>,
}

#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
Expand Down Expand Up @@ -62,6 +64,7 @@ pub struct RecvFuture<'a, T> {
rx: &'a Receiver<T>,
}

#[derive(Debug)]
struct Inner<T> {
thingbuf: ThingBuf<T>,
rx_wait: WaitCell<Waker>,
Expand Down
3 changes: 3 additions & 0 deletions src/mpsc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ pub fn channel<T>(thingbuf: ThingBuf<T>) -> (Sender<T>, Receiver<T>) {
(tx, rx)
}

#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Inner<T>>,
}

#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
Expand All @@ -43,6 +45,7 @@ pub struct SendRef<'a, T> {
slot: Ref<'a, T>,
}

#[derive(Debug)]
struct Inner<T> {
thingbuf: ThingBuf<T>,
rx_wait: WaitCell<Thread>,
Expand Down
14 changes: 14 additions & 0 deletions src/mpsc/tests/mpsc_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,17 @@ fn spsc_recv_then_send_then_close() {
consumer.join().unwrap();
})
}

#[test]
fn tx_close_wakes() {
loom::model(|| {
let (tx, rx) = channel(ThingBuf::<i32>::new(2));
let consumer = thread::spawn(move || {
future::block_on(async move {
assert_eq!(rx.recv().await, None);
})
});
drop(tx);
consumer.join().unwrap();
});
}
12 changes: 12 additions & 0 deletions src/mpsc/tests/mpsc_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,15 @@ fn spsc_recv_then_send_then_close() {
consumer.join().unwrap();
})
}

#[test]
fn tx_close_wakes() {
loom::model(|| {
let (tx, rx) = sync::channel(ThingBuf::<i32>::new(2));
let consumer = thread::spawn(move || {
assert_eq!(rx.recv(), None);
});
drop(tx);
consumer.join().unwrap();
});
}
5 changes: 3 additions & 2 deletions src/static_thingbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ impl<T, const CAP: usize> Drop for StaticThingBuf<T, CAP> {

impl<T, const CAP: usize> fmt::Debug for StaticThingBuf<T, CAP> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ThingBuf")
.field("capacity", &self.capacity())
f.debug_struct("StaticThingBuf")
.field("len", &self.len())
.field("slots", &format_args!("[...]"))
.field("core", &self.core)
.finish()
}
}
3 changes: 2 additions & 1 deletion src/thingbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ impl<T> Drop for ThingBuf<T> {
impl<T> fmt::Debug for ThingBuf<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ThingBuf")
.field("capacity", &self.capacity())
.field("len", &self.len())
.field("slots", &format_args!("[...]"))
.field("core", &self.core)
.finish()
}
}
13 changes: 11 additions & 2 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::loom;
use core::ops::{Deref, DerefMut};
use core::{
fmt,
ops::{Deref, DerefMut},
};

pub(crate) mod panic;
pub(crate) mod wait;
Expand All @@ -12,7 +15,7 @@ pub(crate) struct Backoff(u8);
not(any(target_arch = "x86_64", target_arch = "aarch64")),
repr(align(64))
)]
#[derive(Clone, Copy, Default, Hash, PartialEq, Eq, Debug)]
#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)]
pub(crate) struct CachePadded<T>(pub(crate) T);

// === impl Backoff ===
Expand Down Expand Up @@ -68,3 +71,9 @@ impl<T> DerefMut for CachePadded<T> {
&mut self.0
}
}

impl<T: fmt::Debug> fmt::Debug for CachePadded<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
160 changes: 124 additions & 36 deletions src/util/wait.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crate::{
loom::{
atomic::{AtomicUsize, Ordering::*},
atomic::{
AtomicUsize,
Ordering::{self, *},
},
UnsafeCell,
},
util::panic::{self, RefUnwindSafe, UnwindSafe},
};
use core::{fmt, task::Waker};
use core::{fmt, ops, task::Waker};

#[cfg(feature = "std")]
use crate::loom::thread;
Expand Down Expand Up @@ -43,52 +46,46 @@ pub(crate) trait Notify {
fn notify(self);
}

#[derive(Eq, PartialEq, Copy, Clone)]
struct State(usize);

// === impl WaitCell ===

impl<T: Notify + UnwindSafe + fmt::Debug> WaitCell<T> {
const WAITING: usize = 0b00;
const PARKING: usize = 0b01;
const NOTIFYING: usize = 0b10;
const TX_CLOSED: usize = 0b100;
const RX_CLOSED: usize = 0b1000;

pub(crate) fn new() -> Self {
Self {
lock: AtomicUsize::new(Self::WAITING),
lock: AtomicUsize::new(State::WAITING.0),
waiter: UnsafeCell::new(None),
}
}

pub(crate) fn close_rx(&self) {
self.lock.fetch_or(Self::RX_CLOSED, AcqRel);
test_dbg!(self.fetch_or(State::RX_CLOSED, AcqRel));
}

pub(crate) fn is_rx_closed(&self) -> bool {
test_dbg!(self.lock.load(Acquire) & Self::RX_CLOSED == Self::RX_CLOSED)
test_dbg!(self.current_state().contains(State::RX_CLOSED))
}

pub(crate) fn wait_with(&self, f: impl FnOnce() -> T) -> WaitResult {
test_println!("registering waiter");

// this is based on tokio's AtomicWaker synchronization strategy
match test_dbg!(self
.lock
.compare_exchange(Self::WAITING, Self::PARKING, AcqRel, Acquire,))
{
match test_dbg!(self.compare_exchange(State::WAITING, State::PARKING)) {
// someone else is notifying the receiver, so don't park!
Err(actual) if test_dbg!(actual & Self::TX_CLOSED) == Self::TX_CLOSED => {
test_println!("-> state = TX_CLOSED");
Err(actual) if test_dbg!(actual.contains(State::TX_CLOSED)) => {
return WaitResult::TxClosed;
}
Err(actual) if test_dbg!(actual & Self::NOTIFYING) == Self::NOTIFYING => {
test_println!("-> state = NOTIFYING");
Err(actual) if test_dbg!(actual.contains(State::NOTIFYING)) => {
// f().notify();
// loom::hint::spin_loop();
return WaitResult::Notified;
}

Err(actual) => {
debug_assert!(actual == Self::PARKING || actual == Self::PARKING | Self::NOTIFYING);
debug_assert!(
actual == State::PARKING || actual == State::PARKING | State::NOTIFYING
);
return WaitResult::Wait;
}
Ok(_) => {}
Expand All @@ -106,29 +103,22 @@ impl<T: Notify + UnwindSafe + fmt::Debug> WaitCell<T> {
Err(panic) => (Some(panic), None),
};

let result = match test_dbg!(self.lock.compare_exchange(
Self::PARKING,
Self::WAITING,
AcqRel,
Acquire
)) {
let result = match test_dbg!(self.compare_exchange(State::PARKING, State::WAITING)) {
Ok(_) => {
let _ = panic::catch_unwind(move || drop(prev_waiter));

WaitResult::Wait
}
Err(actual) => {
test_println!("-> was notified; state={:#b}", actual);
test_println!("-> was notified; state={:?}", actual);
let waiter = self.waiter.with_mut(|waiter| unsafe { (*waiter).take() });
// Reset to the WAITING state by clearing everything *except*
// the closed bits (which must remain set).
let state = test_dbg!(self
.lock
.fetch_and(Self::TX_CLOSED | Self::RX_CLOSED, AcqRel));
let state = test_dbg!(self.fetch_and(State::TX_CLOSED | State::RX_CLOSED, AcqRel));
// The only valid state transition while we were parking is to
// add the TX_CLOSED bit.
debug_assert!(
state == actual || state == actual | Self::TX_CLOSED,
state == actual || state == actual | State::TX_CLOSED,
"state changed unexpectedly while parking!"
);

Expand All @@ -143,7 +133,7 @@ impl<T: Notify + UnwindSafe + fmt::Debug> WaitCell<T> {
waiter.notify();
}

if state & Self::TX_CLOSED == Self::TX_CLOSED {
if test_dbg!(state.contains(State::TX_CLOSED)) {
WaitResult::TxClosed
} else {
WaitResult::Notified
Expand All @@ -169,15 +159,16 @@ impl<T: Notify + UnwindSafe + fmt::Debug> WaitCell<T> {
fn notify2(&self, close: bool) {
test_println!("notifying; close={:?};", close);
let bits = if close {
Self::NOTIFYING | Self::TX_CLOSED
State::NOTIFYING | State::TX_CLOSED
} else {
Self::NOTIFYING
State::NOTIFYING
};
if test_dbg!(self.lock.fetch_or(bits, AcqRel)) == Self::WAITING {
test_dbg!(bits);
if test_dbg!(self.fetch_or(bits, AcqRel)) == State::WAITING {
// we have the lock!
let waiter = self.waiter.with_mut(|thread| unsafe { (*thread).take() });

self.lock.fetch_and(!Self::NOTIFYING, Release);
test_dbg!(self.fetch_and(!State::NOTIFYING, AcqRel));

if let Some(waiter) = test_dbg!(waiter) {
waiter.notify();
Expand All @@ -186,6 +177,31 @@ impl<T: Notify + UnwindSafe + fmt::Debug> WaitCell<T> {
}
}

impl<T> WaitCell<T> {
#[inline(always)]
fn compare_exchange(&self, State(curr): State, State(new): State) -> Result<State, State> {
self.lock
.compare_exchange(curr, new, AcqRel, Acquire)
.map(State)
.map_err(State)
}

#[inline(always)]
fn fetch_and(&self, State(state): State, order: Ordering) -> State {
State(self.lock.fetch_and(state, order))
}

#[inline(always)]
fn fetch_or(&self, State(state): State, order: Ordering) -> State {
State(self.lock.fetch_or(state, order))
}

#[inline(always)]
fn current_state(&self) -> State {
State(self.lock.load(Acquire))
}
}

#[cfg(feature = "std")]
impl Notify for thread::Thread {
fn notify(self) {
Expand All @@ -204,6 +220,78 @@ impl Notify for Waker {
impl<T: UnwindSafe> UnwindSafe for WaitCell<T> {}
impl<T: RefUnwindSafe> RefUnwindSafe for WaitCell<T> {}

impl<T> fmt::Debug for WaitCell<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WaitCell")
.field("state", &self.current_state())
.finish()
}
}

// === impl State ===

impl State {
const WAITING: Self = Self(0b00);
const PARKING: Self = Self(0b01);
const NOTIFYING: Self = Self(0b10);
const TX_CLOSED: Self = Self(0b100);
const RX_CLOSED: Self = Self(0b1000);

fn contains(self, Self(state): Self) -> bool {
self.0 & state == state
}
}

impl ops::BitOr for State {
type Output = Self;

fn bitor(self, Self(rhs): Self) -> Self::Output {
Self(self.0 | rhs)
}
}

impl ops::Not for State {
type Output = Self;

fn not(self) -> Self::Output {
Self(!self.0)
}
}

impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut has_states = false;
macro_rules! f_bits {
($self: expr, $f: expr, $has_states: ident, $($name: ident),+) => {
$(
if $self.contains(Self::$name) {
if $has_states {
$f.write_str(" | ")?;
}
$f.write_str(stringify!($name))?;
$has_states = true;
}
)+

};
}

f_bits!(self, f, has_states, PARKING, NOTIFYING, TX_CLOSED, RX_CLOSED);

if !has_states {
if *self == Self::WAITING {
return f.write_str("WAITING");
}

f.debug_tuple("UnknownState")
.field(&format_args!("{:#b}", self.0))
.finish()?;
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
12 changes: 12 additions & 0 deletions tests/debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use thingbuf::{mpsc, ThingBuf};

#[test]
fn mpsc_debug() {
let (tx, rx) = mpsc::channel(ThingBuf::new(4));
println!("tx={:#?}", tx);
println!("rx={:#?}", rx);
let _ = tx.try_send(1);
println!("rx={:#?}", rx);
drop(tx);
println!("rx={:#?}", rx);
}