Skip to content

Commit

Permalink
perf(mspc): replace bad VecDeque wait queue with intrusive list (#16)
Browse files Browse the repository at this point in the history
The new wait queue implementation doesn't allocate, and doesn't require
resizing a `VecDeque` inside a lock. This should improve performance and
(hopefully) make it possible to use the MPSC queue without any
allocations on no-std (once I figure out the static stuff).

Performance for blocking/yielding send is now very competitive after 
switching to the new wait queue, compare the new violin plot of the integer
MPSC benchmark with violin plot with the old wait queue that i posted
in #14.

Old'n'busted:
![image](https://user-images.githubusercontent.com/2796466/145252070-2cdda7bb-5cca-4b85-ab2d-082f3af5f990.png)

New hotness:
![image](https://user-images.githubusercontent.com/2796466/145482064-c0223f3c-feae-4f23-a1db-704a2c18eb16.png)

Big comparison benchmark:
![image](https://user-images.githubusercontent.com/2796466/145482883-a38f253c-17c8-4a0b-a798-67458c5fa27a.png)

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Dec 9, 2021
1 parent a2a52c9 commit 23f4c96
Show file tree
Hide file tree
Showing 14 changed files with 580 additions and 319 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ alloc = []
default = ["std"]

[dependencies]
pin-project = "1"

[dev-dependencies]
tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "macros", "sync"] }
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod macros;

mod loom;
mod util;
mod wait;

feature! {
#![feature = "alloc"]
Expand Down Expand Up @@ -266,6 +267,7 @@ impl Core {
return if test_dbg!(tail & self.closed != 0) {
Err(mpsc::TrySendError::Closed(()))
} else {
test_println!("--> channel full!");
Err(mpsc::TrySendError::Full(()))
};
}
Expand Down
34 changes: 12 additions & 22 deletions src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@
use crate::{
loom::{atomic::AtomicUsize, hint},
util::{
wait::{Notify, WaitCell, WaitResult},
Backoff,
},
util::Backoff,
wait::{queue, Notify, WaitCell, WaitQueue, WaitResult},
Ref, ThingBuf,
};
use core::fmt;
use core::pin::Pin;
use core::task::Poll;

#[cfg(feature = "alloc")]
use crate::util::wait::{NotifyOnDrop, WaitQueue};

#[derive(Debug)]
#[non_exhaustive]
pub enum TrySendError<T = ()> {
Expand All @@ -39,8 +35,7 @@ struct Inner<T, N: Notify> {
thingbuf: ThingBuf<T>,
rx_wait: WaitCell<N>,
tx_count: AtomicUsize,
#[cfg(feature = "alloc")]
tx_wait: WaitQueue<NotifyOnDrop<N>>,
tx_wait: WaitQueue<N>,
}

struct SendRefInner<'a, T, N: Notify> {
Expand All @@ -64,7 +59,7 @@ struct SendRefInner<'a, T, N: Notify> {
}

struct NotifyRx<'a, N: Notify>(&'a WaitCell<N>);
struct NotifyTx<'a, N: Notify>(&'a WaitQueue<NotifyOnDrop<N>>);
struct NotifyTx<'a, N: Notify + Unpin>(&'a WaitQueue<N>);

// ==== impl TrySendError ===

Expand All @@ -78,13 +73,12 @@ impl TrySendError {
}

// ==== impl Inner ====
impl<T, N: Notify> Inner<T, N> {
impl<T, N: Notify + Unpin> Inner<T, N> {
fn new(thingbuf: ThingBuf<T>) -> Self {
Self {
thingbuf,
rx_wait: WaitCell::new(),
tx_count: AtomicUsize::new(1),
#[cfg(feature = "alloc")]
tx_wait: WaitQueue::new(),
}
}
Expand All @@ -93,12 +87,12 @@ impl<T, N: Notify> Inner<T, N> {
if self.thingbuf.core.close() {
crate::loom::hint::spin_loop();
test_println!("draining_queue");
self.tx_wait.drain();
self.tx_wait.close();
}
}
}

impl<T: Default, N: Notify> Inner<T, N> {
impl<T: Default, N: Notify + Unpin> Inner<T, N> {
fn try_send_ref(&self) -> Result<SendRefInner<'_, T, N>, TrySendError> {
self.thingbuf
.core
Expand Down Expand Up @@ -126,7 +120,8 @@ impl<T: Default, N: Notify> Inner<T, N> {
/// may yield, or might park the thread.
fn poll_send_ref(
&self,
mk_waiter: impl Fn() -> N,
mut node: Option<Pin<&mut queue::Waiter<N>>>,
mut register: impl FnMut(&mut Option<N>),
) -> Poll<Result<SendRefInner<'_, T, N>, Closed>> {
let mut backoff = Backoff::new();
// try to send a few times in a loop, in case the receiver notifies us
Expand All @@ -141,11 +136,7 @@ impl<T: Default, N: Notify> Inner<T, N> {
}

// try to push a waiter
let pushed_waiter = self.tx_wait.push_waiter(|| {
let current = mk_waiter();
test_println!("parking sender ({:?})", current);
NotifyOnDrop::new(current)
});
let pushed_waiter = self.tx_wait.push_waiter(&mut node, &mut register);

match test_dbg!(pushed_waiter) {
WaitResult::TxClosed => {
Expand Down Expand Up @@ -196,7 +187,6 @@ impl<T: Default, N: Notify> Inner<T, N> {
// just in case someone sent a message while we were
// registering the waiter.
try_poll_recv!();
test_dbg!(self.tx_wait.notify());
return Poll::Pending;
}
WaitResult::TxClosed => {
Expand Down Expand Up @@ -280,7 +270,7 @@ impl<N: Notify> Drop for NotifyRx<'_, N> {
}
}

impl<N: Notify> Drop for NotifyTx<'_, N> {
impl<N: Notify + Unpin> Drop for NotifyTx<'_, N> {
#[inline]
fn drop(&mut self) {
test_println!("notifying tx ({})", core::any::type_name::<N>());
Expand Down
111 changes: 103 additions & 8 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
atomic::{self, Ordering},
sync::Arc,
},
wait::queue,
Ref, ThingBuf,
};
use core::{
Expand Down Expand Up @@ -73,22 +74,75 @@ impl<T: Default> Sender<T> {
}

pub async fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
// This future is private because if we replace the waiter queue thing with an
// intrusive list, we won't want to expose the future type publicly, for safety reasons.
struct SendRefFuture<'sender, T>(&'sender Sender<T>);
#[pin_project::pin_project(PinnedDrop)]
struct SendRefFuture<'sender, T> {
tx: &'sender Sender<T>,
has_been_queued: bool,
#[pin]
waiter: queue::Waiter<Waker>,
}

impl<'sender, T: Default + 'sender> Future for SendRefFuture<'sender, T> {
type Output = Result<SendRef<'sender, T>, Closed>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
test_println!("SendRefFuture::poll({:p})", self);
// perform one send ref loop iteration
self.0

let this = self.as_mut().project();
let waiter = if test_dbg!(*this.has_been_queued) {
None
} else {
Some(this.waiter)
};
this.tx
.inner
.poll_send_ref(|| cx.waker().clone())
.map(|ok| ok.map(SendRef))
.poll_send_ref(waiter, |waker| {
// if this is called, we are definitely getting queued.
*this.has_been_queued = true;

// if the wait node does not already have a waker, or the task
// has been polled with a waker that won't wake the previous
// one, register a new waker.
let my_waker = cx.waker();
// do we need to re-register?
let will_wake = waker
.as_ref()
.map(|waker| test_dbg!(waker.will_wake(my_waker)))
.unwrap_or(false);

if test_dbg!(will_wake) {
return;
}

*waker = Some(my_waker.clone());
})
.map(|ok| {
// avoid having to lock the list to remove a node that's
// definitely not queued.
*this.has_been_queued = false;
ok.map(SendRef)
})
}
}

SendRefFuture(self).await
#[pin_project::pinned_drop]
impl<T> PinnedDrop for SendRefFuture<'_, T> {
fn drop(self: Pin<&mut Self>) {
test_println!("SendRefFuture::drop({:p})", self);
if test_dbg!(self.has_been_queued) {
let this = self.project();
this.waiter.remove(&this.tx.inner.tx_wait)
}
}
}

SendRefFuture {
tx: self,
has_been_queued: false,
waiter: queue::Waiter::new(),
}
.await
}

pub async fn send(&self, val: T) -> Result<(), Closed<T>> {
Expand Down Expand Up @@ -205,3 +259,44 @@ impl<'a, T: Default> Future for RecvFuture<'a, T> {
self.rx.poll_recv(cx)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::ThingBuf;

fn _assert_sync<T: Sync>(_: T) {}
fn _assert_send<T: Send>(_: T) {}

#[test]
fn recv_ref_future_is_send() {
fn _compiles() {
let (_, rx) = channel::<usize>(ThingBuf::new(10));
_assert_send(rx.recv_ref());
}
}

#[test]
fn recv_ref_future_is_sync() {
fn _compiles() {
let (_, rx) = channel::<usize>(ThingBuf::new(10));
_assert_sync(rx.recv_ref());
}
}

#[test]
fn send_ref_future_is_send() {
fn _compiles() {
let (tx, _) = channel::<usize>(ThingBuf::new(10));
_assert_send(tx.send_ref());
}
}

#[test]
fn send_ref_future_is_sync() {
fn _compiles() {
let (tx, _) = channel::<usize>(ThingBuf::new(10));
_assert_sync(tx.send_ref());
}
}
}
17 changes: 16 additions & 1 deletion src/mpsc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
sync::Arc,
thread::{self, Thread},
},
wait::queue,
Ref, ThingBuf,
};
use core::fmt;
Expand Down Expand Up @@ -54,9 +55,23 @@ impl<T: Default> Sender<T> {
}

pub fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
let mut waiter = queue::Waiter::new();
loop {
// perform one send ref loop iteration
if let Poll::Ready(result) = self.inner.poll_send_ref(thread::current) {

let waiter = unsafe {
// Safety: in this case, it's totally safe to pin the waiter, as
// it is owned uniquely by this function, and it cannot possibly
// be moved while this thread is parked.
Pin::new_unchecked(&mut waiter)
};
if let Poll::Ready(result) = self.inner.poll_send_ref(Some(waiter), |thread| {
if thread.is_none() {
let current = thread::current();
test_println!("registering {:?}", current);
*thread = Some(current);
}
}) {
return result.map(SendRef);
}

Expand Down
16 changes: 9 additions & 7 deletions src/thingbuf/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ fn linearizable() {
fn thread(i: usize, q: &Arc<ThingBuf<usize>>) -> impl FnOnce() {
let q = q.clone();
move || {
while q
.push_ref()
.map(|mut val| {
*val = i;
})
.is_err()
{}
let mut pushed = false;
while !pushed {
pushed = q
.push_ref()
.map(|mut val| {
*val = i;
})
.is_ok();
}

if let Some(mut val) = q.pop_ref() {
*val = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use core::{
ops::{Deref, DerefMut},
};

pub(crate) mod mutex;
pub(crate) mod panic;
pub(crate) mod wait;

#[derive(Debug)]
pub(crate) struct Backoff(u8);
Expand Down
11 changes: 11 additions & 0 deletions src/util/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
feature! {
#![feature = "std"]
pub(crate) use self::std_impl::*;
mod std_impl;
}

#[cfg(not(feature = "std"))]
pub(crate) use self::spin_impl::*;

#[cfg(any(not(feature = "std"), test))]
mod spin_impl;
Loading

0 comments on commit 23f4c96

Please sign in to comment.