Skip to content

Commit

Permalink
perf(mpsc): rewrite and optimize wait queue (#22)
Browse files Browse the repository at this point in the history
This branch rewrites the MPSC channel wait queue implementation (again),
in order to improve performance. This undoes a decently large amount of
the perf regression from PR #20.

In particular, I've made the following changes:
* Simplified the design a bit, and reduced the number of CAS loops in
  both the notify and wait paths
* Factored out fast paths (which touch the state variable without
  locking) from the notify and wait operations into separate functions,
  and marked them as `#[inline(always)]`. If we weren't able to perform
  the operation without actually touching the linked list, we call into
  a separate `#[inline(never)]` function that actually locks the list
  and performs the slow path. This means that code that uses these
  functions still has a function call in it, but a few instructions for
  performing a CAS can be inlined and the function call avoided in some
  cases. This *significantly* improves performance!
* Separated the `wait` function into `start_wait` (called the first time
  a node waits) and `continue_wait` (called if the node is woken, to
  handle spurious wakeups). This allows simplifying the code for
  modifying the waker so that we don't have to pass big closures around.
* Other miscellaneous optimizations, such as cache padding some
  variables that should have been cache padded.

## Performance Comparison

These benchmarks were run against the current `main` branch
(f77d534).

### async/mpsc_reusable

```
async/mpsc_reusable/ThingBuf/10
                        time:   [43.953 us 44.522 us 45.057 us]
                        change: [+0.0419% +1.7594% +3.5099%] (p = 0.05 < 0.05)
                        Change within noise threshold.
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) low severe
  2 (2.00%) low mild
  1 (1.00%) high mild
  1 (1.00%) high severe
async/mpsc_reusable/ThingBuf/50
                        time:   [140.91 us 142.24 us 143.53 us]
                        change: [-31.201% -29.539% -27.824%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild
async/mpsc_reusable/ThingBuf/100
                        time:   [250.31 us 255.03 us 259.68 us]
                        change: [-18.966% -17.190% -15.202%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe

```

### async/mpsc_integer

```
async/mpsc_integer/ThingBuf/10
                        time:   [208.99 us 215.30 us 221.32 us]
                        change: [+0.6957% +3.8603% +6.9400%] (p = 0.02 < 0.05)
                        Change within noise threshold.
async/mpsc_integer/ThingBuf/50
                        time:   [407.46 us 412.74 us 418.31 us]
                        change: [-39.128% -36.567% -33.267%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 13 outliers among 100 measurements (13.00%)
  2 (2.00%) low mild
  4 (4.00%) high mild
  7 (7.00%) high severe
async/mpsc_integer/ThingBuf/100
                        time:   [534.35 us 541.42 us 548.91 us]
                        change: [-44.820% -41.502% -37.120%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 11 outliers among 100 measurements (11.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  7 (7.00%) high severe
```

### async/spsc/try_send_reusable

```
async/spsc/try_send_reusable/ThingBuf/100
                        time:   [12.310 us 12.353 us 12.398 us]
                        thrpt:  [8.0656 Melem/s 8.0952 Melem/s 8.1236 Melem/s]
                 change:
                        time:   [-7.5146% -7.1996% -6.8566%] (p = 0.00 < 0.05)
                        thrpt:  [+7.3613% +7.7582% +8.1252%]
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
async/spsc/try_send_reusable/ThingBuf/500
                        time:   [46.691 us 46.778 us 46.871 us]
                        thrpt:  [10.668 Melem/s 10.689 Melem/s 10.709 Melem/s]
                 change:
                        time:   [-9.4767% -9.2760% -9.0811%] (p = 0.00 < 0.05)
                        thrpt:  [+9.9881% +10.224% +10.469%]
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high mild
async/spsc/try_send_reusable/ThingBuf/1000
                        time:   [89.763 us 90.757 us 91.843 us]
                        thrpt:  [10.888 Melem/s 11.018 Melem/s 11.140 Melem/s]
                 change:
                        time:   [-9.4302% -8.8637% -8.2018%] (p = 0.00 < 0.05)
                        thrpt:  [+8.9346% +9.7257% +10.412%]
                        Performance has improved.
Found 12 outliers among 100 measurements (12.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  8 (8.00%) high severe
async/spsc/try_send_reusable/ThingBuf/5000
                        time:   [415.34 us 417.89 us 420.42 us]
                        thrpt:  [11.893 Melem/s 11.965 Melem/s 12.038 Melem/s]
                 change:
                        time:   [-13.113% -12.774% -12.411%] (p = 0.00 < 0.05)
                        thrpt:  [+14.170% +14.644% +15.093%]
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  7 (7.00%) high mild
async/spsc/try_send_reusable/ThingBuf/10000
                        time:   [847.35 us 848.63 us 849.98 us]
                        thrpt:  [11.765 Melem/s 11.784 Melem/s 11.802 Melem/s]
                 change:
                        time:   [-11.345% -10.820% -10.388%] (p = 0.00 < 0.05)
                        thrpt:  [+11.592% +12.133% +12.796%]
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  5 (5.00%) low mild
  2 (2.00%) high mild
  1 (1.00%) high severe
```

### async/spsc/try_send_integer

```
async/spsc/try_send_integer/ThingBuf/100
                        time:   [7.2254 us 7.2467 us 7.2690 us]
                        thrpt:  [13.757 Melem/s 13.799 Melem/s 13.840 Melem/s]
                 change:
                        time:   [-13.292% -12.912% -12.520%] (p = 0.00 < 0.05)
                        thrpt:  [+14.312% +14.826% +15.330%]
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
async/spsc/try_send_integer/ThingBuf/500
                        time:   [34.358 us 34.477 us 34.582 us]
                        thrpt:  [14.458 Melem/s 14.503 Melem/s 14.553 Melem/s]
                 change:
                        time:   [-18.539% -18.312% -18.072%] (p = 0.00 < 0.05)
                        thrpt:  [+22.058% +22.417% +22.758%]
                        Performance has improved.
async/spsc/try_send_integer/ThingBuf/1000
                        time:   [69.107 us 69.273 us 69.434 us]
                        thrpt:  [14.402 Melem/s 14.436 Melem/s 14.470 Melem/s]
                 change:
                        time:   [-17.759% -17.604% -17.444%] (p = 0.00 < 0.05)
                        thrpt:  [+21.130% +21.365% +21.594%]
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
async/spsc/try_send_integer/ThingBuf/5000
                        time:   [349.44 us 353.41 us 357.81 us]
                        thrpt:  [13.974 Melem/s 14.148 Melem/s 14.309 Melem/s]
                 change:
                        time:   [-14.832% -14.252% -13.447%] (p = 0.00 < 0.05)
                        thrpt:  [+15.537% +16.621% +17.415%]
                        Performance has improved.
Found 13 outliers among 100 measurements (13.00%)
  5 (5.00%) high mild
  8 (8.00%) high severe
async/spsc/try_send_integer/ThingBuf/10000
                        time:   [712.89 us 732.58 us 754.24 us]
                        thrpt:  [13.258 Melem/s 13.650 Melem/s 14.027 Melem/s]
                 change:
                        time:   [-16.082% -15.161% -14.129%] (p = 0.00 < 0.05)
                        thrpt:  [+16.454% +17.870% +19.164%]
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  2 (2.00%) high mild
  5 (5.00%) high severe
```

I'm actually not really sure why this also improved the `try_send`
benchmarks, which don't touch the wait queue...but I'll take it!

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Dec 24, 2021
1 parent f77d534 commit 8c882b0
Show file tree
Hide file tree
Showing 11 changed files with 514 additions and 267 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ default = ["std"]

[dependencies]
pin-project = "1"
parking_lot = { version = "0.11", optional = true }

[dev-dependencies]
tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "macros", "sync"] }
Expand Down
6 changes: 5 additions & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["parking_lot"]
# These feature flags can be disabled if we don't want to run comparison
# benchmarks, such as when just comparing two `thingbuf` versions.
comparisons = ["crossbeam", "async-std", "futures", "tokio-sync", "std-sync"]
tokio-sync = ["tokio/sync"]
std-sync = []

# Use parking_lot mutexes in `thingbuf`
parking_lot = ["thingbuf/parking_lot"]

[dependencies]
thingbuf = { path = ".." }
criterion = { version = "0.3.5", features = ["async_tokio"] }

# for comparison benchmarks
tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "sync", "parking_lot"] }
crossbeam = { version = "0.8.1", optional = true }
async-std = { version = "1", optional = true }
futures = { version = "0.3", optional = true }
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Core {
}
}

#[inline]
#[inline(always)]
fn idx_gen(&self, val: usize) -> (usize, usize) {
(val & self.idx_mask, val & self.gen_mask)
}
Expand Down Expand Up @@ -138,6 +138,7 @@ impl Core {
test_dbg!(self.tail.fetch_or(self.closed, SeqCst) & self.closed == 0)
}

#[inline(always)]
fn push_ref<'slots, T, S>(
&self,
slots: &'slots S,
Expand Down Expand Up @@ -226,6 +227,7 @@ impl Core {
}
}

#[inline(always)]
fn pop_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, mpsc::TrySendError>
where
S: ops::Index<usize, Output = Slot<T>> + ?Sized,
Expand Down
46 changes: 1 addition & 45 deletions src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
use crate::{
loom::{atomic::AtomicUsize, hint},
util::Backoff,
wait::{queue, Notify, WaitCell, WaitQueue, WaitResult},
wait::{Notify, WaitCell, WaitQueue, WaitResult},
Ref, ThingBuf,
};
use core::fmt;
Expand Down Expand Up @@ -113,49 +112,6 @@ impl<T: Default, N: Notify + Unpin> Inner<T, N> {
}
}

/// Performs one iteration of the `send_ref` loop.
///
/// The loop itself has to be written in the actual `send` method's
/// implementation, rather than on `inner`, because it might be async and
/// may yield, or might park the thread.
fn poll_send_ref(
&self,
node: Pin<&mut queue::Waiter<N>>,
mut register: impl FnMut(&mut Option<N>),
) -> Poll<Result<SendRefInner<'_, T, N>, Closed>> {
let mut backoff = Backoff::new();
let mut node = Some(node);
// try to send a few times in a loop, in case the receiver notifies us
// right before we park.
loop {
// try to reserve a send slot, returning if we succeeded or if the
// queue was closed.
match self.try_send_ref() {
Ok(slot) => return Poll::Ready(Ok(slot)),
Err(TrySendError::Closed(_)) => return Poll::Ready(Err(Closed(()))),
Err(_) => {}
}

// try to push a waiter
let pushed_waiter = self.tx_wait.wait(&mut node, &mut register);

match test_dbg!(pushed_waiter) {
WaitResult::Closed => {
// the channel closed while we were registering the waiter!
return Poll::Ready(Err(Closed(())));
}
WaitResult::Wait => {
// okay, we are now queued to wait. gotosleep!
return Poll::Pending;
}
WaitResult::Notified => {
// we consumed a queued notification. try again...
backoff.spin_yield();
}
}
}
}

/// Performs one iteration of the `recv_ref` loop.
///
/// The loop itself has to be written in the actual `send` method's
Expand Down
112 changes: 62 additions & 50 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,83 +77,95 @@ impl<T: Default> Sender<T> {
#[pin_project::pin_project(PinnedDrop)]
struct SendRefFuture<'sender, T> {
tx: &'sender Sender<T>,
queued: bool,
state: State,
#[pin]
waiter: queue::Waiter<Waker>,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum State {
Start,
Waiting,
Done,
}

impl<'sender, T: Default + 'sender> Future for SendRefFuture<'sender, T> {
type Output = Result<SendRef<'sender, T>, Closed>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
test_println!("SendRefFuture::poll({:p})", self);
// perform one send ref loop iteration
let res = {

loop {
let this = self.as_mut().project();
this.tx.inner.poll_send_ref(this.waiter, |waker| {
let my_waker = cx.waker();

// If there's already a waker in the node, we might have
// been woken spuriously for some reason. In that case,
// make sure that the waker in the node will wake the
// waker that was passed in on *this* poll --- the
// future may have moved to another task or something!
if let Some(waker) = waker.as_mut() {
if test_dbg!(!waker.will_wake(my_waker)) {
test_println!(
"poll_send_ref -> re-registering waker {:?}",
my_waker
);
*waker = my_waker.clone();
let node = this.waiter;
match test_dbg!(*this.state) {
State::Start => {
match this.tx.try_send_ref() {
Ok(slot) => return Poll::Ready(Ok(slot)),
Err(TrySendError::Closed(_)) => {
return Poll::Ready(Err(Closed(())))
}
Err(_) => {}
}
return;
}

// Otherwise, we are registering this task for the first
// time.
test_println!("poll_send_ref -> registering initial waker {:?}", my_waker);
*waker = Some(my_waker.clone());
*this.queued = true;
})
};
res.map(|ready| {
let this = self.as_mut().project();
if test_dbg!(*this.queued) {
// If the node was ever in the queue, we have to make
// sure we're *absolutely certain* it isn't still in the
// queue before we say it's okay to drop the node
// without removing it from the linked list. Check to
// make sure we were woken by the queue, and not by a
// spurious wakeup.
//
// This means we *may* be a little bit aggressive about
// locking the wait queue to make sure the node is
// removed, but that's better than leaving dangling
// pointers in the queue...
*this.queued = test_dbg!(!this
.waiter
.was_woken_from_queue
.swap(false, Ordering::AcqRel));
let start_wait = this.tx.inner.tx_wait.start_wait(node, cx.waker());

match test_dbg!(start_wait) {
WaitResult::Closed => {
// the channel closed while we were registering the waiter!
*this.state = State::Done;
return Poll::Ready(Err(Closed(())));
}
WaitResult::Wait => {
// okay, we are now queued to wait.
// gotosleep!
*this.state = State::Waiting;
return Poll::Pending;
}
WaitResult::Notified => continue,
}
}
State::Waiting => {
let continue_wait =
this.tx.inner.tx_wait.continue_wait(node, cx.waker());

match test_dbg!(continue_wait) {
WaitResult::Closed => {
*this.state = State::Done;
return Poll::Ready(Err(Closed(())));
}
WaitResult::Wait => return Poll::Pending,
WaitResult::Notified => {
*this.state = State::Done;
}
}
}
State::Done => match this.tx.try_send_ref() {
Ok(slot) => return Poll::Ready(Ok(slot)),
Err(TrySendError::Closed(_)) => return Poll::Ready(Err(Closed(()))),
Err(_) => {
*this.state = State::Start;
}
},
}
ready.map(SendRef)
})
}
}
}

#[pin_project::pinned_drop]
impl<T> PinnedDrop for SendRefFuture<'_, T> {
fn drop(self: Pin<&mut Self>) {
test_println!("SendRefFuture::drop({:p})", self);
if test_dbg!(self.queued) {
let this = self.project();
let this = self.project();
if test_dbg!(*this.state) == State::Waiting && test_dbg!(this.waiter.is_linked()) {
this.waiter.remove(&this.tx.inner.tx_wait)
}
}
}

SendRefFuture {
tx: self,
queued: false,
state: State::Start,
waiter: queue::Waiter::new(),
}
.await
Expand Down
41 changes: 28 additions & 13 deletions src/mpsc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,43 @@ impl<T: Default> Sender<T> {
}

pub fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
// fast path: avoid getting the thread and constructing the node if the
// slot is immediately ready.
match self.inner.try_send_ref() {
Ok(slot) => return Ok(SendRef(slot)),
Err(TrySendError::Closed(_)) => return Err(Closed(())),
_ => {}
}

let mut waiter = queue::Waiter::new();
let mut unqueued = true;
let thread = thread::current();
loop {
// perform one send ref loop iteration

let waiter = unsafe {
let node = unsafe {
// Safety: in this case, it's totally safe to pin the waiter, as
// it is owned uniquely by this function, and it cannot possibly
// be moved while this thread is parked.
Pin::new_unchecked(&mut waiter)
};
if let Poll::Ready(result) = self.inner.poll_send_ref(waiter, |thread| {
if thread.is_none() {
let current = thread::current();
test_println!("registering {:?}", current);
*thread = Some(current);

let wait = if unqueued {
test_dbg!(self.inner.tx_wait.start_wait(node, &thread))
} else {
test_dbg!(self.inner.tx_wait.continue_wait(node, &thread))
};

match wait {
WaitResult::Closed => return Err(Closed(())),
WaitResult::Notified => match self.inner.try_send_ref() {
Ok(slot) => return Ok(SendRef(slot)),
Err(TrySendError::Closed(_)) => return Err(Closed(())),
_ => {}
},
WaitResult::Wait => {
unqueued = false;
thread::park();
}
}) {
return result.map(SendRef);
}

// if that iteration failed, park the thread.
thread::park();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Backoff {
Self(0)
}

#[inline]
#[inline(always)]
pub(crate) fn spin(&mut self) {
#[cfg(not(all(loom, test)))]
for _ in 0..test_dbg!(1 << self.0.min(Self::MAX_SPINS)) {
Expand All @@ -46,7 +46,7 @@ impl Backoff {
}
}

#[inline]
#[inline(always)]
pub(crate) fn spin_yield(&mut self) {
if self.0 <= Self::MAX_SPINS || cfg!(not(any(feature = "std", test))) {
#[cfg(not(all(loom, test)))]
Expand Down
8 changes: 7 additions & 1 deletion src/util/mutex.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
feature! {
#![feature = "std"]
#![all(feature = "std", not(feature = "parking_lot"))]
pub(crate) use self::std_impl::*;
mod std_impl;
}
Expand All @@ -9,3 +9,9 @@ pub(crate) use self::spin_impl::*;

#[cfg(any(not(feature = "std"), test))]
mod spin_impl;

feature! {
#![all(feature = "std", feature = "parking_lot")]
#[allow(unused_imports)]
pub(crate) use parking_lot::{Mutex, MutexGuard};
}
16 changes: 15 additions & 1 deletion src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,35 @@ pub(crate) enum WaitResult {
Notified,
}

pub(crate) trait Notify: UnwindSafe + fmt::Debug {
pub(crate) trait Notify: UnwindSafe + fmt::Debug + Clone {
fn notify(self);

fn same(&self, other: &Self) -> bool;
}

#[cfg(feature = "std")]
impl Notify for thread::Thread {
#[inline]
fn notify(self) {
test_println!("NOTIFYING {:?} (from {:?})", self, thread::current());
self.unpark();
}

#[inline]
fn same(&self, other: &Self) -> bool {
other.id() == self.id()
}
}

impl Notify for Waker {
#[inline]
fn notify(self) {
test_println!("WAKING TASK {:?} (from {:?})", self, thread::current());
self.wake();
}

#[inline]
fn same(&self, other: &Self) -> bool {
other.will_wake(self)
}
}
Loading

0 comments on commit 8c882b0

Please sign in to comment.