Skip to content

Commit

Permalink
fix(mpsc): fix a deadlock in async send_ref (#20)
Browse files Browse the repository at this point in the history
This fixes a deadlock issue in the async MPSC's `send_ref` method. The
deadlock occurs when a new waker needs to be registered for a task whose
wait node is already in the wait queue. Previously, the new waker would
not be registered because the waker registering closure was only called
when the node was being enqueued. If the node was already in the queue,
polling the future would never touch the waker. This means that if the
task was polled with a new waker, it would leave its old waker in the
queue, and might never be notified again.

This branch fixes that by separating pushing the task and registering
the waker. We check if the node already has a waker prior to registering,
and if it did, we don't push it again.

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Dec 11, 2021
1 parent 2990113 commit c58c620
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 117 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl Core {
}

fn close(&self) -> bool {
test_println!("Core::close");
if std::thread::panicking() {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ macro_rules! test_dbg {
($e:expr) => {
match $e {
e => {
#[cfg(test)]
#[cfg(any(test, all(thingbuf_trace, feature = "std")))]
test_println!("{} = {:?}", stringify!($e), &e);
e
}
Expand Down
6 changes: 4 additions & 2 deletions src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,11 @@ impl<T: Default, N: Notify + Unpin> Inner<T, N> {
/// may yield, or might park the thread.
fn poll_send_ref(
&self,
mut node: Option<Pin<&mut queue::Waiter<N>>>,
node: Pin<&mut queue::Waiter<N>>,
mut register: impl FnMut(&mut Option<N>),
) -> Poll<Result<SendRefInner<'_, T, N>, Closed>> {
let mut backoff = Backoff::new();
let mut node = Some(node);
// try to send a few times in a loop, in case the receiver notifies us
// right before we park.
loop {
Expand All @@ -136,7 +137,7 @@ impl<T: Default, N: Notify + Unpin> Inner<T, N> {
}

// try to push a waiter
let pushed_waiter = self.tx_wait.push_waiter(&mut node, &mut register);
let pushed_waiter = self.tx_wait.wait(&mut node, &mut register);

match test_dbg!(pushed_waiter) {
WaitResult::Closed => {
Expand Down Expand Up @@ -187,6 +188,7 @@ impl<T: Default, N: Notify + Unpin> Inner<T, N> {
// just in case someone sent a message while we were
// registering the waiter.
try_poll_recv!();
test_println!("-> yield");
return Poll::Pending;
}
WaitResult::Closed => {
Expand Down
76 changes: 45 additions & 31 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<T: Default> Sender<T> {
#[pin_project::pin_project(PinnedDrop)]
struct SendRefFuture<'sender, T> {
tx: &'sender Sender<T>,
has_been_queued: bool,
queued: bool,
#[pin]
waiter: queue::Waiter<Waker>,
}
Expand All @@ -88,49 +88,63 @@ impl<T: Default> Sender<T> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
test_println!("SendRefFuture::poll({:p})", self);
// perform one send ref loop iteration

let this = self.as_mut().project();
let waiter = if test_dbg!(*this.has_been_queued) {
None
} else {
Some(this.waiter)
};
this.tx
.inner
.poll_send_ref(waiter, |waker| {
// if this is called, we are definitely getting queued.
*this.has_been_queued = true;

// if the wait node does not already have a waker, or the task
// has been polled with a waker that won't wake the previous
// one, register a new waker.
let res = {
let this = self.as_mut().project();
this.tx.inner.poll_send_ref(this.waiter, |waker| {
let my_waker = cx.waker();
// do we need to re-register?
let will_wake = waker
.as_ref()
.map(|waker| test_dbg!(waker.will_wake(my_waker)))
.unwrap_or(false);

if test_dbg!(will_wake) {
// If there's already a waker in the node, we might have
// been woken spuriously for some reason. In that case,
// make sure that the waker in the node will wake the
// waker that was passed in on *this* poll --- the
// future may have moved to another task or something!
if let Some(waker) = waker.as_mut() {
if test_dbg!(!waker.will_wake(my_waker)) {
test_println!(
"poll_send_ref -> re-registering waker {:?}",
my_waker
);
*waker = my_waker.clone();
}
return;
}

// Otherwise, we are registering this task for the first
// time.
test_println!("poll_send_ref -> registering initial waker {:?}", my_waker);
*waker = Some(my_waker.clone());
*this.queued = true;
})
.map(|ok| {
// avoid having to lock the list to remove a node that's
// definitely not queued.
*this.has_been_queued = false;
ok.map(SendRef)
})
};
res.map(|ready| {
let this = self.as_mut().project();
if test_dbg!(*this.queued) {
// If the node was ever in the queue, we have to make
// sure we're *absolutely certain* it isn't still in the
// queue before we say it's okay to drop the node
// without removing it from the linked list. Check to
// make sure we were woken by the queue, and not by a
// spurious wakeup.
//
// This means we *may* be a little bit aggressive about
// locking the wait queue to make sure the node is
// removed, but that's better than leaving dangling
// pointers in the queue...
*this.queued = test_dbg!(!this
.waiter
.was_woken_from_queue
.swap(false, Ordering::AcqRel));
}
ready.map(SendRef)
})
}
}

#[pin_project::pinned_drop]
impl<T> PinnedDrop for SendRefFuture<'_, T> {
fn drop(self: Pin<&mut Self>) {
test_println!("SendRefFuture::drop({:p})", self);
if test_dbg!(self.has_been_queued) {
if test_dbg!(self.queued) {
let this = self.project();
this.waiter.remove(&this.tx.inner.tx_wait)
}
Expand All @@ -139,7 +153,7 @@ impl<T: Default> Sender<T> {

SendRefFuture {
tx: self,
has_been_queued: false,
queued: false,
waiter: queue::Waiter::new(),
}
.await
Expand Down
2 changes: 1 addition & 1 deletion src/mpsc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<T: Default> Sender<T> {
// be moved while this thread is parked.
Pin::new_unchecked(&mut waiter)
};
if let Poll::Ready(result) = self.inner.poll_send_ref(Some(waiter), |thread| {
if let Poll::Ready(result) = self.inner.poll_send_ref(waiter, |thread| {
if thread.is_none() {
let current = thread::current();
test_println!("registering {:?}", current);
Expand Down
Loading

0 comments on commit c58c620

Please sign in to comment.