Skip to content

Commit

Permalink
feat(mpsc): add waiting send/send_ref (#7)
Browse files Browse the repository at this point in the history
This branch adds implementations of `Sender::send` and
`Sender::send_ref` methods for the MPSC channels. This allows a sender
to wait for capacity when the channel is full, rather than returning an
error.

This took a lot of work to get working, and there were some potential
deadlocks that had to be fixed. However, it's passing all the loom tests
now (finally)!

For the blocking version, some `loom` tests currently fail due to an
upstream issue (tokio-rs/loom#246). However, the async versions of the
same tests (which are not effected by the loom bug) do pass, so we can
assume that it's probably correct.

Currently, the senders queue themselves to wait using a spinlock around
a `VecDeque`, which I don't love at all. But, this works for now, and we
can consider switching to an intrusive list strategy later.

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Dec 6, 2021
1 parent 13c0f71 commit 76df064
Show file tree
Hide file tree
Showing 18 changed files with 1,472 additions and 767 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ futures = "0.3"
criterion = { version = "0.3.5", features = ["async_tokio"] }
# for comparison benchmarks
crossbeam = "0.8.1"
tokio = { version = "1.14.0", features = ["rt", "sync"] }
tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "sync", "macros"] }
async-std = "1"

[profile.test]
Expand All @@ -33,3 +33,6 @@ harness = false
[[bench]]
name = "async_mpsc"
harness = false

[patch.crates-io]
loom = { git = "https://github.com/tokio-rs/loom", branch = "eliza/fix-double-panic-in-drop" }
153 changes: 104 additions & 49 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg_attr(not(feature = "std"), no_std)]
#![cfg_attr(docsrs, feature(doc_cfg))]
use core::{fmt, mem::MaybeUninit, ops::Index};
use core::{cmp, fmt, mem::MaybeUninit, ops::Index};

#[macro_use]
mod macros;
Expand All @@ -26,7 +26,7 @@ pub use self::static_thingbuf::StaticThingBuf;

use crate::{
loom::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicUsize, Ordering::*},
UnsafeCell,
},
util::{Backoff, CachePadded},
Expand All @@ -46,6 +46,7 @@ struct Core {
gen: usize,
gen_mask: usize,
idx_mask: usize,
closed: usize,
capacity: usize,
}

Expand All @@ -57,28 +58,32 @@ struct Slot<T> {
impl Core {
#[cfg(not(test))]
const fn new(capacity: usize) -> Self {
let gen = (capacity + 1).next_power_of_two();
let idx_mask = gen - 1;
let gen_mask = !(gen - 1);
let closed = (capacity + 1).next_power_of_two();
let idx_mask = closed - 1;
let gen = closed << 1;
let gen_mask = !(closed | idx_mask);
Self {
head: CachePadded(AtomicUsize::new(0)),
tail: CachePadded(AtomicUsize::new(0)),
gen,
gen_mask,
closed,
idx_mask,
capacity,
}
}

#[cfg(test)]
fn new(capacity: usize) -> Self {
let gen = (capacity + 1).next_power_of_two();
let idx_mask = gen - 1;
let gen_mask = !(gen - 1);
let closed = (capacity + 1).next_power_of_two();
let idx_mask = closed - 1;
let gen = closed << 1;
let gen_mask = !(closed | idx_mask);
Self {
head: CachePadded(AtomicUsize::new(0)),
tail: CachePadded(AtomicUsize::new(0)),
gen,
closed,
gen_mask,
idx_mask,
capacity,
Expand Down Expand Up @@ -109,34 +114,50 @@ impl Core {
self.capacity
}

fn push_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, Full<()>>
fn close(&self) -> bool {
if std::thread::panicking() {
return false;
}
test_dbg!(self.tail.fetch_or(self.closed, SeqCst) & self.closed == 0)
}

fn push_ref<'slots, T, S>(
&self,
slots: &'slots S,
) -> Result<Ref<'slots, T>, mpsc::TrySendError<()>>
where
T: Default,
S: Index<usize, Output = Slot<T>> + ?Sized,
{
test_println!("push_ref");
let mut backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);
let mut tail = self.tail.load(Relaxed);

loop {
if test_dbg!(tail & self.closed != 0) {
return Err(mpsc::TrySendError::Closed(()));
}
let (idx, gen) = self.idx_gen(tail);
test_dbg!(idx);
test_dbg!(gen);
let slot = &slots[idx];
let state = slot.state.load(Ordering::Acquire);
let actual_state = test_dbg!(slot.state.load(Acquire));
let state = if actual_state == EMPTY_STATE {
idx
} else {
actual_state
};

if state == tail || (state == 0 && gen == 0) {
if test_dbg!(state == tail) || test_dbg!(actual_state == EMPTY_STATE && gen == 0) {
// Move the tail index forward by 1.
let next_tail = self.next(idx, gen);
match self.tail.compare_exchange_weak(
tail,
next_tail,
Ordering::AcqRel,
Ordering::Relaxed,
) {
match test_dbg!(self
.tail
.compare_exchange_weak(tail, next_tail, SeqCst, Acquire))
{
Ok(_) => {
// We got the slot! It's now okay to write to it
test_println!("claimed tail slot");
test_println!("claimed tail slot [{}]", idx);
if gen == 0 {
slot.value.with_mut(|value| unsafe {
// Safety: we have just claimed exclusive ownership over
Expand All @@ -161,50 +182,57 @@ impl Core {
}
}

if state.wrapping_add(self.gen) == tail + 1 {
if self.head.load(Ordering::SeqCst).wrapping_add(self.gen) == tail {
return Err(Full(()));
if test_dbg!(state.wrapping_add(self.gen) == tail + 1) {
// fake RMW op to placate loom. this should be equivalent to
// doing a relaxed load after a SeqCst fence (per Godbolt
// https://godbolt.org/z/zb15qfEa9), however, loom understands
// this correctly, while it does not understand an explicit
// SeqCst fence and a load.
// XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
// load it gets reordered differently in the model checker lmao...
let head = test_dbg!(self.head.fetch_or(0, SeqCst));
if test_dbg!(head.wrapping_add(self.gen) == tail) {
test_println!("channel full");
return Err(mpsc::TrySendError::Full(()));
}

backoff.spin();
} else {
backoff.spin_yield();
}

tail = self.tail.load(Ordering::Relaxed)
tail = test_dbg!(self.tail.load(Acquire));
}
}

fn pop_ref<'slots, T, S>(&self, slots: &'slots S) -> Option<Ref<'slots, T>>
fn pop_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, mpsc::TrySendError>
where
S: Index<usize, Output = Slot<T>> + ?Sized,
{
test_println!("pop_ref");
let mut backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);
let mut head = self.head.load(Relaxed);

loop {
test_dbg!(head);
let (idx, gen) = self.idx_gen(head);
test_dbg!(idx);
test_dbg!(gen);
let slot = &slots[idx];
let state = slot.state.load(Ordering::Acquire);
test_dbg!(state);
let state = test_dbg!(slot.state.load(Acquire));
let state = if state == EMPTY_STATE { idx } else { state };

// If the slot's state is ahead of the head index by one, we can pop
// it.
if test_dbg!(state == head + 1) {
let next_head = self.next(idx, gen);
match self.head.compare_exchange(
head,
next_head,
Ordering::SeqCst,
Ordering::Relaxed,
) {
match test_dbg!(self
.head
.compare_exchange_weak(head, next_head, SeqCst, Acquire))
{
Ok(_) => {
test_println!("claimed head slot");
return Some(Ref {
test_println!("claimed head slot [{}]", idx);
return Ok(Ref {
new_state: head.wrapping_add(self.gen),
slot,
});
Expand All @@ -218,28 +246,43 @@ impl Core {
}

if test_dbg!(state == head) {
let tail = self.tail.load(Ordering::SeqCst);
// fake RMW op to placate loom. this should be equivalent to
// doing a relaxed load after a SeqCst fence (per Godbolt
// https://godbolt.org/z/zb15qfEa9), however, loom understands
// this correctly, while it does not understand an explicit
// SeqCst fence and a load.
// XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
// load it gets reordered differently in the model checker lmao...

let tail = test_dbg!(self.tail.fetch_or(0, SeqCst));

if test_dbg!(tail & !self.closed == head) {
return if test_dbg!(tail & self.closed != 0) {
Err(mpsc::TrySendError::Closed(()))
} else {
Err(mpsc::TrySendError::Full(()))
};
}

if test_dbg!(tail == head) {
return None;
if test_dbg!(backoff.done_spinning()) {
return Err(mpsc::TrySendError::Full(()));
}

backoff.spin();
} else {
backoff.spin_yield();
}

head = self.head.load(Ordering::Relaxed);
head = test_dbg!(self.head.load(Acquire));
}
}

fn len(&self) -> usize {
use std::cmp;
loop {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(SeqCst);
let head = self.head.load(SeqCst);

if self.tail.load(Ordering::SeqCst) == tail {
if self.tail.load(SeqCst) == tail {
let (head_idx, _) = self.idx_gen(head);
let (tail_idx, _) = self.idx_gen(tail);
return match head_idx.cmp(&tail_idx) {
Expand All @@ -256,6 +299,8 @@ impl Core {
// === impl Ref ===

impl<T> Ref<'_, T> {
const RELEASED: usize = usize::MAX;

#[inline]
pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
self.slot.value.with(|value| unsafe {
Expand All @@ -278,15 +323,23 @@ impl<T> Ref<'_, T> {
f(&mut *(&mut *value).as_mut_ptr())
})
}

pub(crate) fn release(&mut self) {
if self.new_state == Self::RELEASED {
test_println!("release_ref; already released");
return;
}

test_println!("release_ref");
test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release));
self.new_state = Self::RELEASED;
}
}

impl<T> Drop for Ref<'_, T> {
#[inline]
fn drop(&mut self) {
test_println!("drop_ref");
self.slot
.state
.store(test_dbg!(self.new_state), Ordering::Release);
self.release();
}
}

Expand Down Expand Up @@ -321,20 +374,22 @@ impl<T: fmt::Write> fmt::Write for Ref<'_, T> {

// === impl Slot ===

const EMPTY_STATE: usize = usize::MAX;

impl<T> Slot<T> {
#[cfg(not(test))]
const fn empty() -> Self {
Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
state: AtomicUsize::new(EMPTY_STATE),
}
}

#[cfg(test)]
fn empty() -> Self {
Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
state: AtomicUsize::new(EMPTY_STATE),
}
}
}
Expand Down
22 changes: 15 additions & 7 deletions src/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub(crate) use self::inner::*;

#[cfg(test)]
mod inner {

pub(crate) mod atomic {
pub use loom::sync::atomic::*;
pub use std::sync::atomic::Ordering;
Expand All @@ -10,6 +11,11 @@ mod inner {
pub(crate) use loom::{cell::UnsafeCell, future, hint, sync, thread};
use std::{cell::RefCell, fmt::Write};

pub(crate) mod model {
#[allow(unused_imports)]
pub(crate) use loom::model::Builder;
}

std::thread_local! {
static TRACE_BUF: RefCell<String> = RefCell::new(String::new());
}
Expand All @@ -25,6 +31,7 @@ mod inner {
.unwrap_or_else(|_| println!("{}", args.take().unwrap()))
}

#[track_caller]
pub(crate) fn run_builder(
builder: loom::model::Builder,
model: impl Fn() + Sync + Send + std::panic::UnwindSafe + 'static,
Expand Down Expand Up @@ -98,11 +105,15 @@ mod inner {
// wrap the loom model with `catch_unwind` to avoid potentially losing
// test output on double panics.
let current_iteration = std::sync::Arc::new(AtomicUsize::new(1));
let test_name = match std::thread::current().name() {
Some("main") | None => "test".to_string(),
Some(name) => name.to_string(),
};
builder.check(move || {
let iteration = current_iteration.fetch_add(1, Ordering::Relaxed);
traceln(format_args!(
"\n---- {} iteration {} ----",
std::thread::current().name().unwrap_or("<unknown test>"),
current_iteration.fetch_add(1, Ordering::Relaxed)
test_name, iteration,
));

model();
Expand All @@ -112,12 +123,9 @@ mod inner {
});
}

#[track_caller]
pub(crate) fn model(model: impl Fn() + std::panic::UnwindSafe + Sync + Send + 'static) {
let mut builder = loom::model::Builder::default();
// // A couple of our tests will hit the max number of branches riiiiight
// // before they should complete. Double it so this stops happening.
builder.max_branches *= 2;
run_builder(builder, model)
run_builder(Default::default(), model)
}

pub(crate) mod alloc {
Expand Down
Loading

0 comments on commit 76df064

Please sign in to comment.