Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(mspc): replace bad VecDeque wait queue with intrusive list #16

Merged
merged 15 commits into from
Dec 9, 2021
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ alloc = []
default = ["std"]

[dependencies]
pin-project = "1"

[dev-dependencies]
tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "macros", "sync"] }
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod macros;

mod loom;
mod util;
mod wait;

feature! {
#![feature = "alloc"]
Expand Down Expand Up @@ -266,6 +267,7 @@ impl Core {
return if test_dbg!(tail & self.closed != 0) {
Err(mpsc::TrySendError::Closed(()))
} else {
test_println!("--> channel full!");
Err(mpsc::TrySendError::Full(()))
};
}
Expand Down
1 change: 1 addition & 0 deletions src/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ mod inner {
fmt::Subscriber::builder()
.with_writer(|| TracebufWriter)
.without_time()
.with_max_level(tracing::Level::TRACE)
.finish()
.with(filter)
.init();
Expand Down
34 changes: 12 additions & 22 deletions src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@

use crate::{
loom::{atomic::AtomicUsize, hint},
util::{
wait::{Notify, WaitCell, WaitResult},
Backoff,
},
util::Backoff,
wait::{queue, Notify, WaitCell, WaitQueue, WaitResult},
Ref, ThingBuf,
};
use core::fmt;
use core::pin::Pin;
use core::task::Poll;

#[cfg(feature = "alloc")]
use crate::util::wait::{NotifyOnDrop, WaitQueue};

#[derive(Debug)]
#[non_exhaustive]
pub enum TrySendError<T = ()> {
Expand All @@ -39,8 +35,7 @@ struct Inner<T, N: Notify> {
thingbuf: ThingBuf<T>,
rx_wait: WaitCell<N>,
tx_count: AtomicUsize,
#[cfg(feature = "alloc")]
tx_wait: WaitQueue<NotifyOnDrop<N>>,
tx_wait: WaitQueue<N>,
}

struct SendRefInner<'a, T, N: Notify> {
Expand All @@ -64,7 +59,7 @@ struct SendRefInner<'a, T, N: Notify> {
}

struct NotifyRx<'a, N: Notify>(&'a WaitCell<N>);
struct NotifyTx<'a, N: Notify>(&'a WaitQueue<NotifyOnDrop<N>>);
struct NotifyTx<'a, N: Notify + Unpin>(&'a WaitQueue<N>);

// ==== impl TrySendError ===

Expand All @@ -78,13 +73,12 @@ impl TrySendError {
}

// ==== impl Inner ====
impl<T, N: Notify> Inner<T, N> {
impl<T, N: Notify + Unpin> Inner<T, N> {
fn new(thingbuf: ThingBuf<T>) -> Self {
Self {
thingbuf,
rx_wait: WaitCell::new(),
tx_count: AtomicUsize::new(1),
#[cfg(feature = "alloc")]
tx_wait: WaitQueue::new(),
}
}
Expand All @@ -93,12 +87,12 @@ impl<T, N: Notify> Inner<T, N> {
if self.thingbuf.core.close() {
crate::loom::hint::spin_loop();
test_println!("draining_queue");
self.tx_wait.drain();
self.tx_wait.close();
}
}
}

impl<T: Default, N: Notify> Inner<T, N> {
impl<T: Default, N: Notify + Unpin> Inner<T, N> {
fn try_send_ref(&self) -> Result<SendRefInner<'_, T, N>, TrySendError> {
self.thingbuf
.core
Expand Down Expand Up @@ -126,7 +120,8 @@ impl<T: Default, N: Notify> Inner<T, N> {
/// may yield, or might park the thread.
fn poll_send_ref(
&self,
mk_waiter: impl Fn() -> N,
mut node: Option<Pin<&mut queue::Waiter<N>>>,
mut register: impl FnMut(&mut Option<N>),
) -> Poll<Result<SendRefInner<'_, T, N>, Closed>> {
let mut backoff = Backoff::new();
// try to send a few times in a loop, in case the receiver notifies us
Expand All @@ -141,11 +136,7 @@ impl<T: Default, N: Notify> Inner<T, N> {
}

// try to push a waiter
let pushed_waiter = self.tx_wait.push_waiter(|| {
let current = mk_waiter();
test_println!("parking sender ({:?})", current);
NotifyOnDrop::new(current)
});
let pushed_waiter = self.tx_wait.push_waiter(&mut node, &mut register);

match test_dbg!(pushed_waiter) {
WaitResult::TxClosed => {
Expand Down Expand Up @@ -196,7 +187,6 @@ impl<T: Default, N: Notify> Inner<T, N> {
// just in case someone sent a message while we were
// registering the waiter.
try_poll_recv!();
test_dbg!(self.tx_wait.notify());
return Poll::Pending;
}
WaitResult::TxClosed => {
Expand Down Expand Up @@ -280,7 +270,7 @@ impl<N: Notify> Drop for NotifyRx<'_, N> {
}
}

impl<N: Notify> Drop for NotifyTx<'_, N> {
impl<N: Notify + Unpin> Drop for NotifyTx<'_, N> {
#[inline]
fn drop(&mut self) {
test_println!("notifying tx ({})", core::any::type_name::<N>());
Expand Down
111 changes: 103 additions & 8 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
atomic::{self, Ordering},
sync::Arc,
},
wait::queue,
Ref, ThingBuf,
};
use core::{
Expand Down Expand Up @@ -73,22 +74,75 @@ impl<T: Default> Sender<T> {
}

pub async fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
// This future is private because if we replace the waiter queue thing with an
// intrusive list, we won't want to expose the future type publicly, for safety reasons.
struct SendRefFuture<'sender, T>(&'sender Sender<T>);
#[pin_project::pin_project(PinnedDrop)]
struct SendRefFuture<'sender, T> {
tx: &'sender Sender<T>,
has_been_queued: bool,
#[pin]
waiter: queue::Waiter<Waker>,
}

impl<'sender, T: Default + 'sender> Future for SendRefFuture<'sender, T> {
type Output = Result<SendRef<'sender, T>, Closed>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
test_println!("SendRefFuture::poll({:p})", self);
// perform one send ref loop iteration
self.0

let this = self.as_mut().project();
let waiter = if test_dbg!(*this.has_been_queued) {
None
} else {
Some(this.waiter)
};
this.tx
.inner
.poll_send_ref(|| cx.waker().clone())
.map(|ok| ok.map(SendRef))
.poll_send_ref(waiter, |waker| {
// if this is called, we are definitely getting queued.
*this.has_been_queued = true;

// if the wait node does not already have a waker, or the task
// has been polled with a waker that won't wake the previous
// one, register a new waker.
let my_waker = cx.waker();
// do we need to re-register?
let will_wake = waker
.as_ref()
.map(|waker| test_dbg!(waker.will_wake(my_waker)))
.unwrap_or(false);

if test_dbg!(will_wake) {
return;
}

*waker = Some(my_waker.clone());
})
.map(|ok| {
// avoid having to lock the list to remove a node that's
// definitely not queued.
*this.has_been_queued = false;
ok.map(SendRef)
})
}
}

SendRefFuture(self).await
#[pin_project::pinned_drop]
impl<T> PinnedDrop for SendRefFuture<'_, T> {
fn drop(self: Pin<&mut Self>) {
test_println!("SendRefFuture::drop({:p})", self);
if test_dbg!(self.has_been_queued) {
let this = self.project();
this.waiter.remove(&this.tx.inner.tx_wait)
}
}
}

SendRefFuture {
tx: self,
has_been_queued: false,
waiter: queue::Waiter::new(),
}
.await
}

pub async fn send(&self, val: T) -> Result<(), Closed<T>> {
Expand Down Expand Up @@ -205,3 +259,44 @@ impl<'a, T: Default> Future for RecvFuture<'a, T> {
self.rx.poll_recv(cx)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::ThingBuf;

fn _assert_sync<T: Sync>(_: T) {}
fn _assert_send<T: Send>(_: T) {}

#[test]
fn recv_ref_future_is_send() {
fn _compiles() {
let (_, rx) = channel::<usize>(ThingBuf::new(10));
_assert_send(rx.recv_ref());
}
}

#[test]
fn recv_ref_future_is_sync() {
fn _compiles() {
let (_, rx) = channel::<usize>(ThingBuf::new(10));
_assert_sync(rx.recv_ref());
}
}

#[test]
fn send_ref_future_is_send() {
fn _compiles() {
let (tx, _) = channel::<usize>(ThingBuf::new(10));
_assert_send(tx.send_ref());
}
}

#[test]
fn send_ref_future_is_sync() {
fn _compiles() {
let (tx, _) = channel::<usize>(ThingBuf::new(10));
_assert_sync(tx.send_ref());
}
}
}
17 changes: 16 additions & 1 deletion src/mpsc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
sync::Arc,
thread::{self, Thread},
},
wait::queue,
Ref, ThingBuf,
};
use core::fmt;
Expand Down Expand Up @@ -54,9 +55,23 @@ impl<T: Default> Sender<T> {
}

pub fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
let mut waiter = queue::Waiter::new();
loop {
// perform one send ref loop iteration
if let Poll::Ready(result) = self.inner.poll_send_ref(thread::current) {

let waiter = unsafe {
// Safety: in this case, it's totally safe to pin the waiter, as
// it is owned uniquely by this function, and it cannot possibly
// be moved while this thread is parked.
Pin::new_unchecked(&mut waiter)
};
if let Poll::Ready(result) = self.inner.poll_send_ref(Some(waiter), |thread| {
if thread.is_none() {
let current = thread::current();
test_println!("registering {:?}", current);
*thread = Some(current);
}
}) {
return result.map(SendRef);
}

Expand Down
16 changes: 9 additions & 7 deletions src/thingbuf/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ fn linearizable() {
fn thread(i: usize, q: &Arc<ThingBuf<usize>>) -> impl FnOnce() {
let q = q.clone();
move || {
while q
.push_ref()
.map(|mut val| {
*val = i;
})
.is_err()
{}
let mut pushed = false;
while !pushed {
pushed = q
.push_ref()
.map(|mut val| {
*val = i;
})
.is_ok();
}

if let Some(mut val) = q.pop_ref() {
*val = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use core::{
ops::{Deref, DerefMut},
};

pub(crate) mod mutex;
pub(crate) mod panic;
pub(crate) mod wait;

#[derive(Debug)]
pub(crate) struct Backoff(u8);
Expand Down
11 changes: 11 additions & 0 deletions src/util/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
feature! {
#![feature = "std"]
pub(crate) use self::std_impl::*;
mod std_impl;
}

#[cfg(not(feature = "std"))]
pub(crate) use self::spin_impl::*;

#[cfg(any(not(feature = "std"), test))]
mod spin_impl;
Loading