From 04de050b973b2bca0a3e4daa77460c3d1b7a0055 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 21 May 2022 11:42:56 -0700 Subject: [PATCH 01/24] wip Signed-off-by: Eliza Weisman --- maitake/src/wait.rs | 3 +- maitake/src/wait/queue.rs | 157 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 maitake/src/wait/queue.rs diff --git a/maitake/src/wait.rs b/maitake/src/wait.rs index b7309e52..336f8f92 100644 --- a/maitake/src/wait.rs +++ b/maitake/src/wait.rs @@ -4,7 +4,8 @@ //! which stores a *single* waiting task, and a wait *queue*, which //! stores a queue of waiting tasks. pub(crate) mod cell; -pub use cell::WaitCell; +mod queue; +pub use self::{cell::WaitCell, queue::WaitQueue}; use core::task::Poll; diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs new file mode 100644 index 00000000..a88b9ff3 --- /dev/null +++ b/maitake/src/wait/queue.rs @@ -0,0 +1,157 @@ +use crate::{ + loom::{ + cell::UnsafeCell, + sync::atomic::{ + AtomicUsize, + Ordering::{self, *}, + }, + }, + util, +}; +use cordyceps::{ + list::{self, List}, + Linked, +}; +use core::{marker::PhantomPinned, ptr::NonNull, task::Waker}; +use mycelium_util::sync::{spin::Mutex, CachePadded}; + +/// A queue of [`Waker`]s implemented using an [intrusive singly-linked list][ilist]. +/// +/// The *[intrusive]* aspect of this list is important, as it means that it does +/// not allocate memory. Instead, nodes in the linked list are stored in the +/// futures of tasks trying to wait for capacity. This means that it is not +/// necessary to allocate any heap memory for each task waiting to be notified. +/// +/// However, the intrusive linked list introduces one new danger: because +/// futures can be *cancelled*, and the linked list nodes live within the +/// futures trying to wait for channel capacity, we *must* ensure that the node +/// is unlinked from the list before dropping a cancelled future. Failure to do +/// so would result in the list containing dangling pointers. Therefore, we must +/// use a *doubly-linked* list, so that nodes can edit both the previous and +/// next node when they have to remove themselves. This is kind of a bummer, as +/// it means we can't use something nice like this [intrusive queue by Dmitry +/// Vyukov][2], and there are not really practical designs for lock-free +/// doubly-linked lists that don't rely on some kind of deferred reclamation +/// scheme such as hazard pointers or QSBR. +/// +/// Instead, we just stick a [`Mutex`] around the linked list, which must be +/// acquired to pop nodes from it, or for nodes to remove themselves when +/// futures are cancelled. This is a bit sad, but the critical sections for this +/// mutex are short enough that we still get pretty good performance despite it. +/// +/// [`Waker`]: core::task::Waker +/// [ilist]: cordyceps::List +/// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction +/// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue +#[derive(Debug)] +pub struct WaitQueue { + /// The wait queue's state variable. + /// + /// The queue is always in one of the following states: + /// + /// - [`EMPTY`]: No waiters are queued, and there is no pending notification. + /// Waiting while the queue is in this state will enqueue the waiter; + /// notifying while in this state will store a pending notification in the + /// queue, transitioning to the `WAKING` state. + /// + /// - [`WAITING`]: There are one or more waiters in the queue. Waiting while + /// the queue is in this state will not transition the state. Waking while + /// in this state will wake the first waiter in the queue; if this empties + /// the queue, then the queue will transition to the `EMPTY` state. + /// + /// - [`WAKING`]: The queue has a stored notification. Waiting while the queue + /// is in this state will consume the pending notification *without* + /// enqueueing the waiter and transition the queue to the `EMPTY` state. + /// Waking while in this state will leave the queue in this state. + /// + /// - [`CLOSED`]: The queue is closed. Waiting while in this state will return + /// [`WaitResult::Closed`] without transitioning the queue's state. + state: CachePadded, + + /// The linked list of waiters. + /// + /// # Safety + /// + /// This is protected by a mutex; the mutex *must* be acquired when + /// manipulating the linked list, OR when manipulating waiter nodes that may + /// be linked into the list. If a node is known to not be linked, it is safe + /// to modify that node (such as by setting or unsetting its + /// `Waker`/`Thread`) without holding the lock; otherwise, it may be + /// modified through the list, so the lock must be held when modifying the + /// node. + /// + /// A spinlock is used on `no_std` platforms; [`std::sync::Mutex`] or + /// `parking_lot::Mutex` are used when the standard library is available + /// (depending on feature flags). + list: Mutex>, +} + +/// A waiter node which may be linked into a wait queue. +#[derive(Debug)] +#[repr(C)] +struct Waiter { + /// The intrusive linked list node. + /// + /// This *must* be the first field in the struct in order for the `Linked` + /// implementation to be sound. + node: UnsafeCell, + + /// The waiter's state variable. + /// + /// A waiter is always in one of the following states: + /// + /// - [`EMPTY`]: The waiter is not linked in the queue, and does not have a + /// `Thread`/`Waker`. + /// + /// - [`WAITING`]: The waiter is linked in the queue and has a + /// `Thread`/`Waker`. + /// + /// - [`WAKING`]: The waiter has been notified by the wait queue. If it is in + /// this state, it is *not* linked into the queue, and does not have a + /// `Thread`/`Waker`. + /// + /// - [`WAKING`]: The waiter has been notified because the wait queue closed. + /// If it is in this state, it is *not* linked into the queue, and does + /// not have a `Thread`/`Waker`. + /// + /// This may be inspected without holding the lock; it can be used to + /// determine whether the lock must be acquired. + state: CachePadded, +} + +#[derive(Debug)] +#[repr(C)] +struct Node { + /// Intrusive linked list pointers. + /// + /// # Safety + /// + /// This *must* be the first field in the struct in order for the `Linked` + /// impl to be sound. + links: list::Links, + + /// The node's waker + waker: Option, + + // This type is !Unpin due to the heuristic from: + // + _pin: PhantomPinned, +} + +unsafe impl Linked> for Waiter { + type Handle = NonNull; + + fn into_ptr(r: Self::Handle) -> NonNull { + r + } + + unsafe fn from_ptr(ptr: NonNull) -> Self::Handle { + ptr + } + + unsafe fn links(ptr: NonNull) -> NonNull> { + (*ptr.as_ptr()) + .node + .with_mut(|node| util::non_null(node).cast::>()) + } +} From 1ad2cb631c82fb82520e9bcc54a3dedeb12d1371 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 21 May 2022 12:24:28 -0700 Subject: [PATCH 02/24] wip wait queue Signed-off-by: Eliza Weisman --- maitake/src/wait.rs | 2 +- maitake/src/wait/queue.rs | 225 +++++++++++++++++++++++++++++++++++++- 2 files changed, 223 insertions(+), 4 deletions(-) diff --git a/maitake/src/wait.rs b/maitake/src/wait.rs index 336f8f92..253410ab 100644 --- a/maitake/src/wait.rs +++ b/maitake/src/wait.rs @@ -11,7 +11,7 @@ use core::task::Poll; /// An error indicating that a [`WaitCell`] or queue was closed while attempting /// register a waiter. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct Closed(()); pub type WaitResult = Result<(), Closed>; diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index a88b9ff3..d676f51f 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -6,14 +6,25 @@ use crate::{ Ordering::{self, *}, }, }, - util, + util::{self, tracing}, + wait::{self, WaitResult}, }; use cordyceps::{ list::{self, List}, Linked, }; -use core::{marker::PhantomPinned, ptr::NonNull, task::Waker}; -use mycelium_util::sync::{spin::Mutex, CachePadded}; +use core::{ + future::Future, + marker::PhantomPinned, + pin::Pin, + ptr::NonNull, + task::{Context, Poll, Waker}, +}; +use mycelium_util::{ + fmt, + sync::{spin::Mutex, CachePadded}, +}; +use pin_project::{pin_project, pinned_drop}; /// A queue of [`Waker`]s implemented using an [intrusive singly-linked list][ilist]. /// @@ -86,6 +97,31 @@ pub struct WaitQueue { list: Mutex>, } +/// Future returned from [`WaitQueue::wait()`]. +/// +/// This future is fused, so once it has completed, any future calls to poll +/// will immediately return `Poll::Ready`. +#[derive(Debug)] +#[pin_project(PinnedDrop)] +pub struct Wait<'a> { + /// The `WaitQueue` being received on. + q: &'a WaitQueue, + + /// The future's state. + state: State, + + /// Entry in the wait queue. + #[pin] + waiter: Waiter, +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +enum State { + Start, + Waiting, + Done(WaitResult), +} + /// A waiter node which may be linked into a wait queue. #[derive(Debug)] #[repr(C)] @@ -138,6 +174,137 @@ struct Node { _pin: PhantomPinned, } +const EMPTY: usize = 0; +const WAITING: usize = 1; +const WAKING: usize = 2; +const CLOSED: usize = 3; + +// === impl WaitQueue === + +impl WaitQueue { + #[inline(always)] + fn start_wait(&self, node: Pin<&mut Waiter>, cx: &mut Context<'_>) -> Poll { + // Optimistically, acquire a stored notification before trying to lock + // the wait list. + match test_dbg!(self.state.compare_exchange(WAKING, EMPTY, SeqCst, SeqCst)) { + Ok(_) => return wait::notified(), + Err(CLOSED) => return wait::closed(), + Err(_) => {} + }; + + // Slow path: the queue is not closed, and we failed to consume a stored + // notification. We need to acquire the lock and enqueue the waiter. + self.start_wait_slow(node, cx) + } + + /// Slow path of `start_wait`: acquires the linked list lock, and adds the + /// waiter to the queue. + #[cold] + #[inline(never)] + fn start_wait_slow(&self, node: Pin<&mut Waiter>, cx: &mut Context<'_>) -> Poll { + // There are no queued notifications to consume, and the queue is + // still open. Therefore, it's time to actually push the waiter to + // the queue...finally lol :) + + // Grab the lock... + let mut list = self.list.lock(); + // Reload the queue's state, as it may have changed while we were + // waiting to lock the linked list. + let mut state = self.state.load(Acquire); + + loop { + match test_dbg!(state) { + // The queue is empty: transition the state to WAITING, as we + // are adding a waiter. + EMPTY => { + match test_dbg!(self + .state + .compare_exchange_weak(EMPTY, WAITING, SeqCst, SeqCst)) + { + Ok(_) => break, + Err(actual) => { + debug_assert!(actual == EMPTY || actual == WAKING || actual == CLOSED); + state = actual; + } + } + } + + // The queue was woken while we were waiting to acquire the + // lock. Attempt to consume the wakeup. + WAKING => { + match test_dbg!(self + .state + .compare_exchange_weak(WAKING, EMPTY, SeqCst, SeqCst)) + { + // Consumed the wakeup! + Ok(_) => return wait::notified(), + Err(actual) => { + debug_assert!(actual == WAKING || actual == EMPTY || actual == CLOSED); + state = actual; + } + } + } + + // The queue closed while we were waiting to acquire the lock; + // we're done here! + CLOSED => return wait::closed(), + + // The queue is already in the WAITING state, so we don't need + // to mess with it. + _state => { + debug_assert_eq!(_state, WAITING, + "start_wait_slow: unexpected state value {:?} (expected WAITING). this is a bug!", + _state, + ); + break; + } + } + } + + // Time to wait! Store the waiter in the node, advance the node's state + // to Waiting, and add it to the queue. + + node.with_node(&mut *list, |node| { + let _prev = node.waker.replace(cx.waker().clone()); + debug_assert!( + _prev.is_none(), + "start_wait_slow: called with a node that already had a waiter!" + ); + }); + + let _prev_state = test_dbg!(node.state.swap(WAITING, Release)); + debug_assert!( + _prev_state == EMPTY || _prev_state == WAKING, + "start_wait_slow: called with a node that was not empty ({}) or woken ({})! actual={}", + EMPTY, + WAKING, + _prev_state, + ); + list.push_front(ptr(node)); + + Poll::Pending + } +} + +// === impl Waiter === + +impl Waiter { + /// # Safety + /// + /// This is only safe to call while the list is locked. The dummy `_list` + /// parameter ensures this method is only called while holding the lock, so + /// this can be safe. + #[inline(always)] + #[cfg_attr(loom, track_caller)] + fn with_node(&self, _list: &mut List, f: impl FnOnce(&mut Node) -> T) -> T { + self.node.with_mut(|node| unsafe { + // Safety: the dummy `_list` argument ensures that the caller has + // the right to mutate the list (e.g. the list is locked). + f(&mut *node) + }) + } +} + unsafe impl Linked> for Waiter { type Handle = NonNull; @@ -155,3 +322,55 @@ unsafe impl Linked> for Waiter { .with_mut(|node| util::non_null(node).cast::>()) } } + +// === impl Wait === + +impl Future for Wait<'_> { + type Output = Result<(), wait::Closed>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().project(); + let ptr = ptr(this.waiter); + + tracing::trace!(self = fmt::ptr(ptr), state = ?this.state, "Wait::poll"); + + loop { + let this = self.as_mut().project(); + + match *this.state { + State::Start => match this.q.start_wait(this.waiter, cx) { + Poll::Ready(x) => { + *this.state = State::Done(x); + return Poll::Ready(x); + } + Poll::Pending => { + *this.state = State::Waiting; + return Poll::Pending; + } + }, + State::Waiting => todo!(), + State::Done(res) => return Poll::Ready(res), + } + } + } +} + +#[pinned_drop] +impl PinnedDrop for Wait<'_> { + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + let state = this.state; + let ptr = ptr(this.waiter); + + tracing::trace!(self = fmt::ptr(ptr), ?state, "Wait::drop"); + if *state == State::Waiting { + unsafe { + this.q.list.lock().remove(ptr); + } + } + } +} + +fn ptr(pin: Pin<&mut Waiter>) -> NonNull { + pin.as_ref().get_ref().into() +} From 0239df62053139321e84cf964d05e9861c985923 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 2 Jun 2022 11:52:40 -0700 Subject: [PATCH 03/24] wait queue basically works Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 479 +++++++++++++++++++------------- maitake/src/wait/queue/tests.rs | 54 ++++ 2 files changed, 347 insertions(+), 186 deletions(-) create mode 100644 maitake/src/wait/queue/tests.rs diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index d676f51f..e26788ff 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -6,7 +6,7 @@ use crate::{ Ordering::{self, *}, }, }, - util::{self, tracing}, + util, wait::{self, WaitResult}, }; use cordyceps::{ @@ -19,14 +19,19 @@ use core::{ pin::Pin, ptr::NonNull, task::{Context, Poll, Waker}, + mem, }; +use mycelium_bitfield::{FromBits, bitfield}; use mycelium_util::{ fmt, sync::{spin::Mutex, CachePadded}, }; use pin_project::{pin_project, pinned_drop}; -/// A queue of [`Waker`]s implemented using an [intrusive singly-linked list][ilist]. +#[cfg(test)] +mod tests; + +/// A queue of [`Waker`]s implemented using an [intrusive doubly-linked list][ilist]. /// /// The *[intrusive]* aspect of this list is important, as it means that it does /// not allocate memory. Instead, nodes in the linked list are stored in the @@ -57,26 +62,6 @@ use pin_project::{pin_project, pinned_drop}; #[derive(Debug)] pub struct WaitQueue { /// The wait queue's state variable. - /// - /// The queue is always in one of the following states: - /// - /// - [`EMPTY`]: No waiters are queued, and there is no pending notification. - /// Waiting while the queue is in this state will enqueue the waiter; - /// notifying while in this state will store a pending notification in the - /// queue, transitioning to the `WAKING` state. - /// - /// - [`WAITING`]: There are one or more waiters in the queue. Waiting while - /// the queue is in this state will not transition the state. Waking while - /// in this state will wake the first waiter in the queue; if this empties - /// the queue, then the queue will transition to the `EMPTY` state. - /// - /// - [`WAKING`]: The queue has a stored notification. Waiting while the queue - /// is in this state will consume the pending notification *without* - /// enqueueing the waiter and transition the queue to the `EMPTY` state. - /// Waking while in this state will leave the queue in this state. - /// - /// - [`CLOSED`]: The queue is closed. Waiting while in this state will return - /// [`WaitResult::Closed`] without transitioning the queue's state. state: CachePadded, /// The linked list of waiters. @@ -94,7 +79,7 @@ pub struct WaitQueue { /// A spinlock is used on `no_std` platforms; [`std::sync::Mutex`] or /// `parking_lot::Mutex` are used when the standard library is available /// (depending on feature flags). - list: Mutex>, + queue: Mutex>, } /// Future returned from [`WaitQueue::wait()`]. @@ -104,55 +89,36 @@ pub struct WaitQueue { #[derive(Debug)] #[pin_project(PinnedDrop)] pub struct Wait<'a> { - /// The `WaitQueue` being received on. - q: &'a WaitQueue, - - /// The future's state. - state: State, + /// The `WaitQueue` being waited on from. + queue: &'a WaitQueue, /// Entry in the wait queue. #[pin] waiter: Waiter, } + #[derive(Debug, Copy, Clone, Eq, PartialEq)] -enum State { - Start, +enum WaitState { + Start(usize), Waiting, - Done(WaitResult), + Woken, } /// A waiter node which may be linked into a wait queue. #[derive(Debug)] #[repr(C)] +#[pin_project] struct Waiter { /// The intrusive linked list node. /// /// This *must* be the first field in the struct in order for the `Linked` /// implementation to be sound. + #[pin] node: UnsafeCell, - /// The waiter's state variable. - /// - /// A waiter is always in one of the following states: - /// - /// - [`EMPTY`]: The waiter is not linked in the queue, and does not have a - /// `Thread`/`Waker`. - /// - /// - [`WAITING`]: The waiter is linked in the queue and has a - /// `Thread`/`Waker`. - /// - /// - [`WAKING`]: The waiter has been notified by the wait queue. If it is in - /// this state, it is *not* linked into the queue, and does not have a - /// `Thread`/`Waker`. - /// - /// - [`WAKING`]: The waiter has been notified because the wait queue closed. - /// If it is in this state, it is *not* linked into the queue, and does - /// not have a `Thread`/`Waker`. - /// - /// This may be inspected without holding the lock; it can be used to - /// determine whether the lock must be acquired. - state: CachePadded, + /// The future's state. + state: WaitState, } #[derive(Debug)] @@ -167,122 +133,189 @@ struct Node { links: list::Links, /// The node's waker - waker: Option, + waker: Wakeup, + + // /// Optional user data. + // data: Option, // This type is !Unpin due to the heuristic from: // _pin: PhantomPinned, } -const EMPTY: usize = 0; -const WAITING: usize = 1; -const WAKING: usize = 2; -const CLOSED: usize = 3; +bitfield! { + #[derive(Eq, PartialEq)] + struct QueueState { + const STATE: State; + const WAKE_ALLS = ..; + } +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[repr(u8)] +enum State { + /// No waiters are queued, and there is no pending notification. + /// Waiting while the queue is in this state will enqueue the waiter; + /// notifying while in this state will store a pending notification in the + /// queue, transitioning to the [`Woken`] state. + Empty = 0, + + /// There are one or more waiters in the queue. Waiting while + /// the queue is in this state will not transition the state. Waking while + /// in this state will wake the first waiter in the queue; if this empties + /// the queue, then the queue will transition to the [`Empty`] state. + Waiting = 1, + + /// The queue has a stored notification. Waiting while the queue + /// is in this state will consume the pending notification *without* + /// enqueueing the waiter and transition the queue to the [`Empty`] state. + /// Waking while in this state will leave the queue in this state. + Woken = 2, + + /// The queue is closed. Waiting while in this state will return + /// [`Closed`] without transitioning the queue's state. + Closed = 3, + +} + +impl QueueState { + fn with_state(self, state: State) -> Self { + self.with(Self::STATE, state) + } +} + +impl FromBits for State { + const BITS: u32 = 2; + type Error = core::convert::Infallible; + + fn try_from_bits(bits: usize) -> Result { + Ok(match bits as u8 { + bits if bits == Self::Empty as u8 => Self::Empty, + bits if bits == Self::Waiting as u8 => Self::Waiting, + bits if bits == Self::Woken as u8 => Self::Woken, + bits if bits == Self::Closed as u8 => Self::Closed, + _ => unsafe { mycelium_util::unreachable_unchecked!("all potential 2-bit patterns should be covered!") }, + }) + } + + fn into_bits(self) -> usize { + self as u8 as usize + } +} + +#[derive(Clone, Debug)] +enum Wakeup { + Empty, + Waiting(Waker), + One, + All, + Closed, +} + // === impl WaitQueue === impl WaitQueue { - #[inline(always)] - fn start_wait(&self, node: Pin<&mut Waiter>, cx: &mut Context<'_>) -> Poll { - // Optimistically, acquire a stored notification before trying to lock - // the wait list. - match test_dbg!(self.state.compare_exchange(WAKING, EMPTY, SeqCst, SeqCst)) { - Ok(_) => return wait::notified(), - Err(CLOSED) => return wait::closed(), - Err(_) => {} - }; - - // Slow path: the queue is not closed, and we failed to consume a stored - // notification. We need to acquire the lock and enqueue the waiter. - self.start_wait_slow(node, cx) + + #[cfg(not(loom))] + pub const fn new() -> Self { + Self { + state: CachePadded::new(AtomicUsize::new(0)), + queue: Mutex::new(List::new()), + } + } + + #[cfg(loom)] + pub fn new() -> Self { + Self { + state: CachePadded::new(AtomicUsize::new(0)), + queue: Mutex::new(List::new()), + } + } + + + pub fn wake(&self) { + let mut state = test_dbg!(self.load(Acquire)); + while state.get(QueueState::STATE) != State::Waiting { + let next = state.with_state(State::Woken); + match test_dbg!(self.compare_exchange(state, next, SeqCst, SeqCst)) { + Ok(_) => return, + Err(actual) => state = actual, + } + } + + let mut queue = self.queue.lock(); + + test_trace!("wake: -> locked"); + state = test_dbg!(self.load(SeqCst)); + + if let Some(waker) = self.wake_locked(&mut *queue, state) { + waker.wake(); + } + } + + pub fn wait(&self) -> Wait<'_> { + let current_wake_alls = test_dbg!(self.load(SeqCst).get(QueueState::WAKE_ALLS)); + Wait { + queue: self, + waiter: Waiter { + state: WaitState::Start(current_wake_alls), + node: UnsafeCell::new(Node { + links: list::Links::new(), + waker: Wakeup::Empty, + _pin: PhantomPinned, + }), + }, + } + } + + fn load(&self, ordering: Ordering) -> QueueState { + let state = self.state.load(ordering); + QueueState::from_bits(state) + } + + fn store(&self, state: QueueState, ordering: Ordering) { + self.state.store(state.0, ordering) + } + + fn compare_exchange(&self, current: QueueState, new: QueueState, success: Ordering, failure: Ordering) -> Result { + self.state.compare_exchange(current.0, new.0, success, failure).map(QueueState::from_bits).map_err(QueueState::from_bits) } - /// Slow path of `start_wait`: acquires the linked list lock, and adds the - /// waiter to the queue. #[cold] #[inline(never)] - fn start_wait_slow(&self, node: Pin<&mut Waiter>, cx: &mut Context<'_>) -> Poll { - // There are no queued notifications to consume, and the queue is - // still open. Therefore, it's time to actually push the waiter to - // the queue...finally lol :) - - // Grab the lock... - let mut list = self.list.lock(); - // Reload the queue's state, as it may have changed while we were - // waiting to lock the linked list. - let mut state = self.state.load(Acquire); - - loop { - match test_dbg!(state) { - // The queue is empty: transition the state to WAITING, as we - // are adding a waiter. - EMPTY => { - match test_dbg!(self - .state - .compare_exchange_weak(EMPTY, WAITING, SeqCst, SeqCst)) - { - Ok(_) => break, - Err(actual) => { - debug_assert!(actual == EMPTY || actual == WAKING || actual == CLOSED); - state = actual; - } - } - } + fn wake_locked(&self, queue: &mut List, curr: QueueState) -> Option { + let state = curr.get(QueueState::STATE); + if test_dbg!(state) != State::Waiting { - // The queue was woken while we were waiting to acquire the - // lock. Attempt to consume the wakeup. - WAKING => { - match test_dbg!(self - .state - .compare_exchange_weak(WAKING, EMPTY, SeqCst, SeqCst)) - { - // Consumed the wakeup! - Ok(_) => return wait::notified(), - Err(actual) => { - debug_assert!(actual == WAKING || actual == EMPTY || actual == CLOSED); - state = actual; - } - } - } - - // The queue closed while we were waiting to acquire the lock; - // we're done here! - CLOSED => return wait::closed(), - - // The queue is already in the WAITING state, so we don't need - // to mess with it. - _state => { - debug_assert_eq!(_state, WAITING, - "start_wait_slow: unexpected state value {:?} (expected WAITING). this is a bug!", - _state, - ); - break; - } + if let Err(actual) = test_dbg!(self.compare_exchange(curr, curr.with_state(State::Waiting), SeqCst, SeqCst)) { + debug_assert!(actual.get(QueueState::STATE) != State::Waiting); + self.store(actual.with_state(State::Woken), SeqCst); } + + return None; } - // Time to wait! Store the waiter in the node, advance the node's state - // to Waiting, and add it to the queue. - node.with_node(&mut *list, |node| { - let _prev = node.waker.replace(cx.waker().clone()); - debug_assert!( - _prev.is_none(), - "start_wait_slow: called with a node that already had a waiter!" - ); + let mut waiter = queue.pop_back() + .expect("if we are in the Waiting state, there must be waiters in the queue"); + + // we are holding the lock on the queue, so it is safe to mutate any + // node in the queue. + let waker = unsafe { waiter.as_mut() }.with_node(queue, |node: &mut Node| { + let waker = test_dbg!(mem::replace(&mut node.waker, Wakeup::One)); + match waker { + Wakeup::Waiting(waker) => waker, + _ => unreachable!("tried to wake a waiter in the {:?} state!", waker), + } }); - let _prev_state = test_dbg!(node.state.swap(WAITING, Release)); - debug_assert!( - _prev_state == EMPTY || _prev_state == WAKING, - "start_wait_slow: called with a node that was not empty ({}) or woken ({})! actual={}", - EMPTY, - WAKING, - _prev_state, - ); - list.push_front(ptr(node)); - - Poll::Pending + if test_dbg!(queue.is_empty()) { + // we have taken the final waiter from the queue + self.store(curr.with_state(State::Empty), SeqCst); + } + + Some(waker) } } @@ -296,13 +329,111 @@ impl Waiter { /// this can be safe. #[inline(always)] #[cfg_attr(loom, track_caller)] - fn with_node(&self, _list: &mut List, f: impl FnOnce(&mut Node) -> T) -> T { + fn with_node(&self, _list: &mut List, f: impl FnOnce(&mut Node) -> U) -> U { self.node.with_mut(|node| unsafe { // Safety: the dummy `_list` argument ensures that the caller has // the right to mutate the list (e.g. the list is locked). f(&mut *node) }) } + + fn poll_wait(mut self: Pin<&mut Self>, queue: &WaitQueue, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().project(); + + match test_dbg!(*this.state) { + WaitState::Start(wake_alls) => { + let mut queue_state = test_dbg!(queue.load(SeqCst)); + + // can we consume a pending wakeup? + if test_dbg!(queue.compare_exchange(queue_state.with_state(State::Woken), queue_state.with_state(State::Empty), SeqCst, SeqCst)).is_ok() { + *this.state = WaitState::Woken; + return Poll::Ready(Ok(())); + } + + // okay, no pending wakeups. try to wait... + test_trace!("poll_wait: locking..."); + let mut waiters = queue.queue.lock(); + test_trace!("poll_wait: -> locked"); + queue_state = test_dbg!(queue.load(SeqCst)); + + // the whole queue was woken while we were trying to acquire + // the lock! + if queue_state.get(QueueState::WAKE_ALLS) != wake_alls { + *this.state = WaitState::Woken; + return Poll::Ready(Ok(())); + } + + // transition the queue to the waiting state + 'to_waiting: loop { + match test_dbg!(queue_state.get(QueueState::STATE)) { + // the queue is EMPTY, transition to WAITING + State::Empty => { + match test_dbg!(queue.compare_exchange(queue_state, queue_state.with_state(State::Waiting), SeqCst, SeqCst)) { + Ok(_) => break 'to_waiting, + Err(actual) => queue_state = actual, + } + }, + // the queue is already WAITING + State::Waiting => break 'to_waiting, + // the queue was woken, consume the wakeup. + State::Woken => { + match test_dbg!(queue.compare_exchange(queue_state, queue_state.with_state(State::Empty), SeqCst, SeqCst)) { + Ok(_) => { + *this.state = WaitState::Woken; + return Poll::Ready(Ok(())); + } + Err(actual) => queue_state = actual, + } + } + State::Closed => return wait::closed(), + } + } + + // enqueue the node + *this.state = WaitState::Waiting; + this.node.as_mut().with_mut(|node| { + unsafe { + // safety: we may mutate the node because we are + // holding the lock. + (*node).waker = Wakeup::Waiting(cx.waker().clone()); + } + }); + let ptr = unsafe { + NonNull::from(Pin::into_inner_unchecked(self)) + }; + waiters.push_front(ptr); + + Poll::Pending + }, + WaitState::Waiting => { + let mut _waiters = queue.queue.lock(); + this.node.with_mut(|node| unsafe { + // safety: we may mutate the node because we are + // holding the lock. + let node = &mut *node; + match node.waker { + Wakeup::Waiting(ref mut waker) => { + if !waker.will_wake(cx.waker()) { + *waker = cx.waker().clone(); + } + Poll::Pending + }, + Wakeup::All | Wakeup::One => { + *this.state = WaitState::Woken; + Poll::Ready(Ok(())) + }, + Wakeup::Closed => { + *this.state = WaitState::Woken; + wait::closed() + } + Wakeup::Empty => unreachable!(), + } + }) + }, + WaitState::Woken => Poll::Ready(Ok(())), + } + + } } unsafe impl Linked> for Waiter { @@ -326,51 +457,27 @@ unsafe impl Linked> for Waiter { // === impl Wait === impl Future for Wait<'_> { - type Output = Result<(), wait::Closed>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - let ptr = ptr(this.waiter); + type Output = WaitResult; - tracing::trace!(self = fmt::ptr(ptr), state = ?this.state, "Wait::poll"); - - loop { - let this = self.as_mut().project(); - - match *this.state { - State::Start => match this.q.start_wait(this.waiter, cx) { - Poll::Ready(x) => { - *this.state = State::Done(x); - return Poll::Ready(x); - } - Poll::Pending => { - *this.state = State::Waiting; - return Poll::Pending; - } - }, - State::Waiting => todo!(), - State::Done(res) => return Poll::Ready(res), - } - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.waiter.poll_wait(this.queue, cx) } } #[pinned_drop] impl PinnedDrop for Wait<'_> { - fn drop(self: Pin<&mut Self>) { - let this = self.project(); - let state = this.state; - let ptr = ptr(this.waiter); - - tracing::trace!(self = fmt::ptr(ptr), ?state, "Wait::drop"); - if *state == State::Waiting { + fn drop(mut self: Pin<&mut Self>) { + let mut this = self.as_mut().project(); + let state = *(this.waiter.as_mut().project().state); + let ptr = NonNull::from(unsafe { + Pin::into_inner_unchecked(this.waiter) + }); + test_trace!(self = ?fmt::ptr(ptr), ?state, "Wait::drop"); + if state == WaitState::Waiting { unsafe { - this.q.list.lock().remove(ptr); + this.queue.queue.lock().remove(ptr); } } } -} - -fn ptr(pin: Pin<&mut Waiter>) -> NonNull { - pin.as_ref().get_ref().into() -} +} \ No newline at end of file diff --git a/maitake/src/wait/queue/tests.rs b/maitake/src/wait/queue/tests.rs new file mode 100644 index 00000000..ed591161 --- /dev/null +++ b/maitake/src/wait/queue/tests.rs @@ -0,0 +1,54 @@ +use super::*; + +#[cfg(loom)] +mod loom { + use super::*; + use crate::loom::{self, sync::Arc, future, thread}; + + #[test] + fn wake_one() { + loom::model(|| { + let q = Arc::new(WaitQueue::new()); + let thread = thread::spawn({ + let q = q.clone(); + move || { + future::block_on(async { + q.wait().await.expect("queue must not be closed"); + }); + } + }); + + q.wake(); + thread.join().unwrap(); + }); + } + + #[test] + fn wake_many() { + loom::model(|| { + let q = Arc::new(WaitQueue::new()); + + fn thread(q: &Arc) -> thread::JoinHandle<()> { + let q = q.clone(); + thread::spawn(move || { + future::block_on(async { + q.wait().await.expect("queue must not be closed"); + q.wake(); + }) + }) + } + + q.wake(); + + let thread1 = thread(&q); + let thread2 = thread(&q); + + thread1.join().unwrap(); + thread2.join().unwrap(); + + future::block_on(async { + q.wait().await.expect("queue must not be closed"); + }); + }); + } +} From 40fe389c0b4f9ebd183e11415987beed9b844d7e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 2 Jun 2022 19:09:02 -0700 Subject: [PATCH 04/24] wip wake_all Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 105 +++++++++++++++++++++----------- maitake/src/wait/queue/tests.rs | 25 +++++++- 2 files changed, 94 insertions(+), 36 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index e26788ff..eb8dc99b 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -179,6 +179,8 @@ enum State { } impl QueueState { + const ONE_WAKE_ALL: usize = Self::WAKE_ALLS.first_bit(); + fn with_state(self, state: State) -> Self { self.with(Self::STATE, state) } @@ -235,10 +237,10 @@ impl WaitQueue { pub fn wake(&self) { - let mut state = test_dbg!(self.load(Acquire)); + let mut state = test_dbg!(self.load()); while state.get(QueueState::STATE) != State::Waiting { let next = state.with_state(State::Woken); - match test_dbg!(self.compare_exchange(state, next, SeqCst, SeqCst)) { + match test_dbg!(self.compare_exchange(state, next)) { Ok(_) => return, Err(actual) => state = actual, } @@ -247,15 +249,37 @@ impl WaitQueue { let mut queue = self.queue.lock(); test_trace!("wake: -> locked"); - state = test_dbg!(self.load(SeqCst)); + state = test_dbg!(self.load()); if let Some(waker) = self.wake_locked(&mut *queue, state) { waker.wake(); } } + pub fn wake_all(&self) { + let mut queue = self.queue.lock(); + let state = test_dbg!(self.load()); + + // if there are no waiters in the queue, increment the number of + // `wake_all` calls and return. + if state.get(QueueState::STATE) != State::Waiting { + self.state.fetch_add(QueueState::ONE_WAKE_ALL, SeqCst); + return; + } + + // okay, we actually have to wake some stuff. + + // TODO(eliza): wake outside the lock using an array, a la + // https://github.com/tokio-rs/tokio/blob/4941fbf7c43566a8f491c64af5a4cd627c99e5a6/tokio/src/sync/batch_semaphore.rs#L277-L303 + while let Some(node) = queue.pop_back() { + let waker = Waiter::wake(node, &mut queue, Wakeup::All); + waker.wake() + } + } + + pub fn wait(&self) -> Wait<'_> { - let current_wake_alls = test_dbg!(self.load(SeqCst).get(QueueState::WAKE_ALLS)); + let current_wake_alls = test_dbg!(self.load().get(QueueState::WAKE_ALLS)); Wait { queue: self, waiter: Waiter { @@ -269,17 +293,17 @@ impl WaitQueue { } } - fn load(&self, ordering: Ordering) -> QueueState { - let state = self.state.load(ordering); + fn load(&self) -> QueueState { + let state = self.state.load(SeqCst); QueueState::from_bits(state) } - fn store(&self, state: QueueState, ordering: Ordering) { - self.state.store(state.0, ordering) + fn store(&self, state: QueueState) { + self.state.store(state.0, SeqCst) } - fn compare_exchange(&self, current: QueueState, new: QueueState, success: Ordering, failure: Ordering) -> Result { - self.state.compare_exchange(current.0, new.0, success, failure).map(QueueState::from_bits).map_err(QueueState::from_bits) + fn compare_exchange(&self, current: QueueState, new: QueueState) -> Result { + self.state.compare_exchange(current.0, new.0, SeqCst, SeqCst).map(QueueState::from_bits).map_err(QueueState::from_bits) } #[cold] @@ -288,37 +312,42 @@ impl WaitQueue { let state = curr.get(QueueState::STATE); if test_dbg!(state) != State::Waiting { - if let Err(actual) = test_dbg!(self.compare_exchange(curr, curr.with_state(State::Waiting), SeqCst, SeqCst)) { + if let Err(actual) = test_dbg!(self.compare_exchange(curr, curr.with_state(State::Waiting))) { debug_assert!(actual.get(QueueState::STATE) != State::Waiting); - self.store(actual.with_state(State::Woken), SeqCst); + self.store(actual.with_state(State::Woken)); } return None; } - let mut waiter = queue.pop_back() + let node = queue.pop_back() .expect("if we are in the Waiting state, there must be waiters in the queue"); - - // we are holding the lock on the queue, so it is safe to mutate any - // node in the queue. - let waker = unsafe { waiter.as_mut() }.with_node(queue, |node: &mut Node| { - let waker = test_dbg!(mem::replace(&mut node.waker, Wakeup::One)); - match waker { - Wakeup::Waiting(waker) => waker, - _ => unreachable!("tried to wake a waiter in the {:?} state!", waker), - } - }); + let waker = Waiter::wake(node, queue, Wakeup::One); if test_dbg!(queue.is_empty()) { // we have taken the final waiter from the queue - self.store(curr.with_state(State::Empty), SeqCst); + self.store(curr.with_state(State::Empty)); } Some(waker) } } +impl Drop for WaitQueue { + fn drop(&mut self) { + let mut queue = self.queue.lock(); + test_dbg!(self.state.fetch_or(State::Closed as u8 as usize, SeqCst)); + + // TODO(eliza): wake outside the lock using an array, a la + // https://github.com/tokio-rs/tokio/blob/4941fbf7c43566a8f491c64af5a4cd627c99e5a6/tokio/src/sync/batch_semaphore.rs#L277-L303 + while let Some(node) = queue.pop_back() { + let waker = Waiter::wake(node, &mut queue, Wakeup::Closed); + waker.wake() + } + } +} + // === impl Waiter === impl Waiter { @@ -329,12 +358,18 @@ impl Waiter { /// this can be safe. #[inline(always)] #[cfg_attr(loom, track_caller)] - fn with_node(&self, _list: &mut List, f: impl FnOnce(&mut Node) -> U) -> U { - self.node.with_mut(|node| unsafe { - // Safety: the dummy `_list` argument ensures that the caller has - // the right to mutate the list (e.g. the list is locked). - f(&mut *node) - }) + fn wake(mut this: NonNull, _list: &mut List, wakeup: Wakeup) -> Waker { + unsafe { + // safety: this is only called while holding the lock on the queue, + // so it's safe to mutate the waiter. + this.as_mut().node.with_mut(|node| { + let waker = test_dbg!(mem::replace(&mut (*node).waker, wakeup)); + match waker { + Wakeup::Waiting(waker) => waker, + _ => unreachable!("tried to wake a waiter in the {:?} state!", waker), + } + }) + } } fn poll_wait(mut self: Pin<&mut Self>, queue: &WaitQueue, cx: &mut Context<'_>) -> Poll { @@ -342,10 +377,10 @@ impl Waiter { match test_dbg!(*this.state) { WaitState::Start(wake_alls) => { - let mut queue_state = test_dbg!(queue.load(SeqCst)); + let mut queue_state = test_dbg!(queue.load()); // can we consume a pending wakeup? - if test_dbg!(queue.compare_exchange(queue_state.with_state(State::Woken), queue_state.with_state(State::Empty), SeqCst, SeqCst)).is_ok() { + if test_dbg!(queue.compare_exchange(queue_state.with_state(State::Woken), queue_state.with_state(State::Empty))).is_ok() { *this.state = WaitState::Woken; return Poll::Ready(Ok(())); } @@ -354,7 +389,7 @@ impl Waiter { test_trace!("poll_wait: locking..."); let mut waiters = queue.queue.lock(); test_trace!("poll_wait: -> locked"); - queue_state = test_dbg!(queue.load(SeqCst)); + queue_state = test_dbg!(queue.load()); // the whole queue was woken while we were trying to acquire // the lock! @@ -368,7 +403,7 @@ impl Waiter { match test_dbg!(queue_state.get(QueueState::STATE)) { // the queue is EMPTY, transition to WAITING State::Empty => { - match test_dbg!(queue.compare_exchange(queue_state, queue_state.with_state(State::Waiting), SeqCst, SeqCst)) { + match test_dbg!(queue.compare_exchange(queue_state, queue_state.with_state(State::Waiting))) { Ok(_) => break 'to_waiting, Err(actual) => queue_state = actual, } @@ -377,7 +412,7 @@ impl Waiter { State::Waiting => break 'to_waiting, // the queue was woken, consume the wakeup. State::Woken => { - match test_dbg!(queue.compare_exchange(queue_state, queue_state.with_state(State::Empty), SeqCst, SeqCst)) { + match test_dbg!(queue.compare_exchange(queue_state, queue_state.with_state(State::Empty))) { Ok(_) => { *this.state = WaitState::Woken; return Poll::Ready(Ok(())); diff --git a/maitake/src/wait/queue/tests.rs b/maitake/src/wait/queue/tests.rs index ed591161..478bfa23 100644 --- a/maitake/src/wait/queue/tests.rs +++ b/maitake/src/wait/queue/tests.rs @@ -24,7 +24,30 @@ mod loom { } #[test] - fn wake_many() { + fn wake_all_sequential() { + loom::model(|| { + let q = Arc::new(WaitQueue::new()); + let wait1 = q.wait(); + let wait2 = q.wait(); + + let thread = thread::spawn({ + let q = q.clone(); + move || { + q.wake_all(); + } + }); + + future::block_on(async { + wait1.await.unwrap(); + wait2.await.unwrap(); + }); + + thread.join().unwrap(); + }); + } + + #[test] + fn wake_one_many() { loom::model(|| { let q = Arc::new(WaitQueue::new()); From f439f64322d0aeb40f95a9ffce97ec1927685862 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 3 Jun 2022 09:28:04 -0700 Subject: [PATCH 05/24] nicer tracing Signed-off-by: Eliza Weisman --- bitfield/src/bitfield.rs | 10 +++++----- maitake/src/util.rs | 12 ++++++++++-- maitake/src/wait/queue.rs | 39 +++++++++++++++++++++++++-------------- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/bitfield/src/bitfield.rs b/bitfield/src/bitfield.rs index db3af151..bbbc6776 100644 --- a/bitfield/src/bitfield.rs +++ b/bitfield/src/bitfield.rs @@ -234,11 +234,11 @@ macro_rules! bitfield { #[automatically_derived] impl core::fmt::Debug for $Name { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - if f.alternate() { - f.debug_tuple(stringify!($Name)).field(&format_args!("{}", self)).finish() - } else { - f.debug_tuple(stringify!($Name)).field(&format_args!("{:#b}", self.0)).finish() - } + let mut dbg = f.debug_struct(stringify!($Name)); + $( + dbg.field(stringify!($Field), &self.get(Self::$Field)); + )+ + dbg.finish() } } diff --git a/maitake/src/util.rs b/maitake/src/util.rs index 9b54cee8..928efcd7 100644 --- a/maitake/src/util.rs +++ b/maitake/src/util.rs @@ -18,7 +18,12 @@ macro_rules! test_dbg { ($e:expr) => { match $e { e => { - crate::util::tracing::debug!("{} = {:?}", stringify!($e), &e); + crate::util::tracing::debug!( + location = %core::panic::Location::caller(), + "{} = {:?}", + stringify!($e), + &e + ); e } } @@ -33,7 +38,10 @@ macro_rules! test_trace { #[cfg(test)] macro_rules! test_trace { ($($args:tt)+) => { - crate::util::tracing::debug!($($args)+); + crate::util::tracing::debug!( + location = %core::panic::Location::caller(), + $($args)+ + ); }; } diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index eb8dc99b..cdabc20e 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -237,10 +237,10 @@ impl WaitQueue { pub fn wake(&self) { - let mut state = test_dbg!(self.load()); + let mut state = self.load(); while state.get(QueueState::STATE) != State::Waiting { let next = state.with_state(State::Woken); - match test_dbg!(self.compare_exchange(state, next)) { + match self.compare_exchange(state, next) { Ok(_) => return, Err(actual) => state = actual, } @@ -249,7 +249,7 @@ impl WaitQueue { let mut queue = self.queue.lock(); test_trace!("wake: -> locked"); - state = test_dbg!(self.load()); + state = self.load(); if let Some(waker) = self.wake_locked(&mut *queue, state) { waker.wake(); @@ -258,7 +258,7 @@ impl WaitQueue { pub fn wake_all(&self) { let mut queue = self.queue.lock(); - let state = test_dbg!(self.load()); + let state = self.load(); // if there are no waiters in the queue, increment the number of // `wake_all` calls and return. @@ -293,17 +293,27 @@ impl WaitQueue { } } + + #[cfg_attr(test, track_caller)] fn load(&self) -> QueueState { - let state = self.state.load(SeqCst); - QueueState::from_bits(state) + #[allow(clippy::let_and_return)] + let state = QueueState::from_bits(self.state.load(SeqCst)); + test_trace!("state.load() = {state:?}"); + state } + #[cfg_attr(test, track_caller)] fn store(&self, state: QueueState) { - self.state.store(state.0, SeqCst) + test_trace!("state.store({state:?}"); + self.state.store(state.0, SeqCst); } + #[cfg_attr(test, track_caller)] fn compare_exchange(&self, current: QueueState, new: QueueState) -> Result { - self.state.compare_exchange(current.0, new.0, SeqCst, SeqCst).map(QueueState::from_bits).map_err(QueueState::from_bits) + #[allow(clippy::let_and_return)] + let res = self.state.compare_exchange(current.0, new.0, SeqCst, SeqCst).map(QueueState::from_bits).map_err(QueueState::from_bits); + test_trace!("state.compare_exchange({current:?}, {new:?}) = {res:?}"); + res } #[cold] @@ -312,7 +322,7 @@ impl WaitQueue { let state = curr.get(QueueState::STATE); if test_dbg!(state) != State::Waiting { - if let Err(actual) = test_dbg!(self.compare_exchange(curr, curr.with_state(State::Waiting))) { + if let Err(actual) = self.compare_exchange(curr, curr.with_state(State::Waiting)) { debug_assert!(actual.get(QueueState::STATE) != State::Waiting); self.store(actual.with_state(State::Woken)); } @@ -338,7 +348,7 @@ impl Drop for WaitQueue { fn drop(&mut self) { let mut queue = self.queue.lock(); test_dbg!(self.state.fetch_or(State::Closed as u8 as usize, SeqCst)); - + // TODO(eliza): wake outside the lock using an array, a la // https://github.com/tokio-rs/tokio/blob/4941fbf7c43566a8f491c64af5a4cd627c99e5a6/tokio/src/sync/batch_semaphore.rs#L277-L303 while let Some(node) = queue.pop_back() { @@ -373,14 +383,15 @@ impl Waiter { } fn poll_wait(mut self: Pin<&mut Self>, queue: &WaitQueue, cx: &mut Context<'_>) -> Poll { + test_trace!(ptr = ?fmt::ptr(self.as_mut()), "Waiter::poll_wait"); let mut this = self.as_mut().project(); match test_dbg!(*this.state) { WaitState::Start(wake_alls) => { - let mut queue_state = test_dbg!(queue.load()); + let mut queue_state = queue.load(); // can we consume a pending wakeup? - if test_dbg!(queue.compare_exchange(queue_state.with_state(State::Woken), queue_state.with_state(State::Empty))).is_ok() { + if queue.compare_exchange(queue_state.with_state(State::Woken), queue_state.with_state(State::Empty)).is_ok() { *this.state = WaitState::Woken; return Poll::Ready(Ok(())); } @@ -403,7 +414,7 @@ impl Waiter { match test_dbg!(queue_state.get(QueueState::STATE)) { // the queue is EMPTY, transition to WAITING State::Empty => { - match test_dbg!(queue.compare_exchange(queue_state, queue_state.with_state(State::Waiting))) { + match queue.compare_exchange(queue_state, queue_state.with_state(State::Waiting)) { Ok(_) => break 'to_waiting, Err(actual) => queue_state = actual, } @@ -412,7 +423,7 @@ impl Waiter { State::Waiting => break 'to_waiting, // the queue was woken, consume the wakeup. State::Woken => { - match test_dbg!(queue.compare_exchange(queue_state, queue_state.with_state(State::Empty))) { + match queue.compare_exchange(queue_state, queue_state.with_state(State::Empty)) { Ok(_) => { *this.state = WaitState::Woken; return Poll::Ready(Ok(())); From f28f8b75af33fc6b77526c9f307003f30e013087 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 3 Jun 2022 09:33:43 -0700 Subject: [PATCH 06/24] fix missing state transition in wake_all Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index cdabc20e..99c44bc5 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -275,6 +275,11 @@ impl WaitQueue { let waker = Waiter::wake(node, &mut queue, Wakeup::All); waker.wake() } + + // now that the queue has been drained, transition to the empty state, + // and increment the wake_all count. + let next_state = QueueState::new().with_state(State::Empty).with(QueueState::WAKE_ALLS, state.get(QueueState::WAKE_ALLS) + 1); + self.compare_exchange(state, next_state).expect("state should not have transitioned while locked"); } From 646ac044284d6d601034f6f59459158322ab55fe Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 3 Jun 2022 09:33:53 -0700 Subject: [PATCH 07/24] cleanup imports Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 99c44bc5..4c11746d 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -3,7 +3,7 @@ use crate::{ cell::UnsafeCell, sync::atomic::{ AtomicUsize, - Ordering::{self, *}, + Ordering::*, }, }, util, @@ -22,10 +22,9 @@ use core::{ mem, }; use mycelium_bitfield::{FromBits, bitfield}; -use mycelium_util::{ - fmt, - sync::{spin::Mutex, CachePadded}, -}; +use mycelium_util::sync::{spin::Mutex, CachePadded}; +#[cfg(test)] +use mycelium_util::fmt; use pin_project::{pin_project, pinned_drop}; #[cfg(test)] From 1d485631c28bca510caf813fabf07e98a692596d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 3 Jun 2022 09:50:43 -0700 Subject: [PATCH 08/24] add wait_owned Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 109 ++++++++++++++++++++++++++------ maitake/src/wait/queue/tests.rs | 38 +++++++++++ 2 files changed, 129 insertions(+), 18 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 4c11746d..73ab9129 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -283,17 +283,21 @@ impl WaitQueue { pub fn wait(&self) -> Wait<'_> { - let current_wake_alls = test_dbg!(self.load().get(QueueState::WAKE_ALLS)); Wait { queue: self, - waiter: Waiter { - state: WaitState::Start(current_wake_alls), - node: UnsafeCell::new(Node { - links: list::Links::new(), - waker: Wakeup::Empty, - _pin: PhantomPinned, - }), - }, + waiter: self.waiter() + } + } + + fn waiter(&self) -> Waiter { + let current_wake_alls = test_dbg!(self.load().get(QueueState::WAKE_ALLS)); + Waiter { + state: WaitState::Start(current_wake_alls), + node: UnsafeCell::new(Node { + links: list::Links::new(), + waker: Wakeup::Empty, + _pin: PhantomPinned, + }), } } @@ -484,6 +488,19 @@ impl Waiter { } } + + fn release(mut self: Pin<&mut Self>, queue: &WaitQueue) { + let state = *(self.as_mut().project().state); + let ptr = NonNull::from(unsafe { + Pin::into_inner_unchecked(self) + }); + test_trace!(self = ?fmt::ptr(ptr), ?state, ?queue, "Waiter::release"); + if state == WaitState::Waiting { + unsafe { + queue.queue.lock().remove(ptr); + } + } + } } unsafe impl Linked> for Waiter { @@ -518,15 +535,71 @@ impl Future for Wait<'_> { #[pinned_drop] impl PinnedDrop for Wait<'_> { fn drop(mut self: Pin<&mut Self>) { - let mut this = self.as_mut().project(); - let state = *(this.waiter.as_mut().project().state); - let ptr = NonNull::from(unsafe { - Pin::into_inner_unchecked(this.waiter) - }); - test_trace!(self = ?fmt::ptr(ptr), ?state, "Wait::drop"); - if state == WaitState::Waiting { - unsafe { - this.queue.queue.lock().remove(ptr); + let this = self.project(); + this.waiter.release(this.queue); + } +} + +// === impl WaitOwned === + +feature!{ + #![feature = "alloc"] + + use alloc::sync::{Arc, Weak}; + + /// Future returned from [`WaitQueue::wait_owned()`]. + /// + /// This is identical to the [`Wait`] future, except that it takes the + /// [`WaitQueue`] as an [`Arc`] clone, allowing it to live for the `'static` + /// lifetime. + /// + /// This future is fused, so once it has completed, any future calls to poll + /// will immediately return `Poll::Ready`. + #[derive(Debug)] + #[pin_project(PinnedDrop)] + pub struct WaitOwned { + /// The `WaitQueue` being waited on from. + queue: Weak, + + /// Entry in the wait queue. + #[pin] + waiter: Waiter, + } + + impl WaitQueue { + pub fn wait_owned(self: &Arc) -> WaitOwned { + let waiter = self.waiter(); + let queue = Arc::downgrade(self); + WaitOwned { queue, waiter } + } + } + + impl Future for WaitOwned { + type Output = WaitResult; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.queue.upgrade() { + Some(queue) => this.waiter.poll_wait(&*queue, cx), + None => wait::closed(), + } + } + } + + #[pinned_drop] + impl PinnedDrop for WaitOwned { + fn drop(mut self: Pin<&mut Self>) { + let this = self.project(); + match this.queue.upgrade() { + Some(ref queue) => this.waiter.release(queue), + None => { + test_trace!("WaitOwned::drop: queue already dropped"); + debug_assert_ne!( + *this.waiter.project().state, + WaitState::Waiting, + "if the queue has been dropped, the waiter should have been notified!", + ) + } } } } diff --git a/maitake/src/wait/queue/tests.rs b/maitake/src/wait/queue/tests.rs index 478bfa23..ffbd8129 100644 --- a/maitake/src/wait/queue/tests.rs +++ b/maitake/src/wait/queue/tests.rs @@ -46,6 +46,44 @@ mod loom { }); } + #[test] + fn wake_all_concurrent() { + use alloc::sync::Arc; + + loom::model(|| { + let q = Arc::new(WaitQueue::new()); + let wait1 = q.wait_owned(); + let wait2 = q.wait_owned(); + + let thread1 = thread::spawn(move || { future::block_on(wait1).expect("wait1 must not fail") }); + let thread2 = thread::spawn(move || { future::block_on(wait2).expect("wait2 must not fail") }); + + q.wake_all(); + + thread1.join().unwrap(); + thread2.join().unwrap(); + }); + } + + #[test] + fn wake_on_drop() { + use alloc::sync::Arc; + + loom::model(|| { + let q = Arc::new(WaitQueue::new()); + let wait1 = q.wait_owned(); + let wait2 = q.wait_owned(); + + let thread1 = thread::spawn(move || { future::block_on(wait1).expect_err("wait1 must be canceled") }); + let thread2 = thread::spawn(move || { future::block_on(wait2).expect_err("wait2 must be canceled") }); + + drop(q); + + thread1.join().unwrap(); + thread2.join().unwrap(); + }); + } + #[test] fn wake_one_many() { loom::model(|| { From f0ec8fdebe7f2f401cf08ea094e81ddefb996085 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 3 Jun 2022 11:14:05 -0700 Subject: [PATCH 09/24] don't use spinlocks in loom tests this fixes test failures due to unbounded branches Signed-off-by: Eliza Weisman --- Cargo.lock | 45 +++++++++++++++++++++++++++++++++++++++ maitake/Cargo.toml | 2 +- maitake/src/loom.rs | 39 +++++++++++++++++++++++++++++++-- maitake/src/wait/queue.rs | 7 ++++-- 4 files changed, 88 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4813961e..d937fb7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -418,6 +418,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "itoa" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" + [[package]] name = "json" version = "0.12.4" @@ -476,6 +482,8 @@ dependencies = [ "generator", "pin-utils", "scoped-tls", + "serde", + "serde_json", "tracing 0.1.34", "tracing-subscriber 0.3.11", ] @@ -893,12 +901,49 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "ryu" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" + [[package]] name = "scoped-tls" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" +[[package]] +name = "serde" +version = "1.0.137" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.137" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.4" diff --git a/maitake/Cargo.toml b/maitake/Cargo.toml index 7bf8055a..8f881a71 100644 --- a/maitake/Cargo.toml +++ b/maitake/Cargo.toml @@ -49,7 +49,7 @@ git = "https://github.com/tokio-rs/tracing" futures-util = "0.3" [target.'cfg(loom)'.dev-dependencies] -loom = { version = "0.5.5", features = ["futures"] } +loom = { version = "0.5.5", features = ["futures", "checkpoint"] } tracing_01 = { package = "tracing", version = "0.1", default_features = false } tracing_subscriber_03 = { package = "tracing-subscriber", version = "0.3.11", features = ["fmt"] } diff --git a/maitake/src/loom.rs b/maitake/src/loom.rs index b82aa9e4..79b39ae8 100644 --- a/maitake/src/loom.rs +++ b/maitake/src/loom.rs @@ -3,9 +3,42 @@ pub(crate) use self::inner::*; #[cfg(loom)] mod inner { + #![allow(dead_code)] + #[cfg(feature = "alloc")] - pub use loom::alloc; - pub use loom::{cell, future, hint, model, sync, thread}; + pub(crate) use loom::alloc; + pub(crate) use loom::{cell, future, hint, model, thread}; + + pub(crate) mod sync { + pub(crate) use loom::sync::*; + + pub(crate) mod spin { + pub(crate) use loom::sync::MutexGuard; + + /// Mock version of mycelium's spinlock, but using + /// `loom::sync::Mutex`. The API is slightly different, since the + /// mycelium mutex does not support poisoning. + #[derive(Debug)] + pub(crate) struct Mutex(loom::sync::Mutex); + + impl Mutex { + #[track_caller] + pub(crate) fn new(t: T) -> Self { + Self(loom::sync::Mutex::new(t)) + } + + #[track_caller] + pub fn try_lock(&self) -> Option> { + self.0.try_lock().ok() + } + + #[track_caller] + pub fn lock(&self) -> MutexGuard<'_, T> { + self.0.lock().expect("loom mutex will never poison") + } + } + } + } } #[cfg(not(loom))] @@ -15,6 +48,8 @@ mod inner { #[cfg(feature = "alloc")] pub use alloc::sync::*; pub use core::sync::*; + + pub use mycelium_util::sync::spin; } pub(crate) use core::sync::atomic; diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 73ab9129..f49bcbfb 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -1,10 +1,13 @@ use crate::{ loom::{ cell::UnsafeCell, - sync::atomic::{ + sync::{ + spin::Mutex, + atomic::{ AtomicUsize, Ordering::*, }, + } }, util, wait::{self, WaitResult}, @@ -22,7 +25,7 @@ use core::{ mem, }; use mycelium_bitfield::{FromBits, bitfield}; -use mycelium_util::sync::{spin::Mutex, CachePadded}; +use mycelium_util::sync::CachePadded; #[cfg(test)] use mycelium_util::fmt; use pin_project::{pin_project, pinned_drop}; From 8812bffafb8082d5ea492275f66c4589a0f05300 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 3 Jun 2022 16:06:52 -0700 Subject: [PATCH 10/24] docs Signed-off-by: Eliza Weisman --- maitake/src/lib.rs | 2 +- maitake/src/wait.rs | 13 ++-- maitake/src/wait/queue.rs | 129 ++++++++++++++++++++++++++++++++------ 3 files changed, 118 insertions(+), 26 deletions(-) diff --git a/maitake/src/lib.rs b/maitake/src/lib.rs index 9b04defa..d750bc67 100644 --- a/maitake/src/lib.rs +++ b/maitake/src/lib.rs @@ -1,6 +1,6 @@ #![cfg_attr(docsrs, doc = include_str!("../README.md"))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg, doc_cfg_hide))] -#![cfg_attr(docsrs, doc(cfg_hide(docsrs)))] +#![cfg_attr(docsrs, doc(cfg_hide(docsrs, loom)))] #![cfg_attr(not(test), no_std)] #[cfg(feature = "alloc")] extern crate alloc; diff --git a/maitake/src/wait.rs b/maitake/src/wait.rs index 253410ab..186b7f08 100644 --- a/maitake/src/wait.rs +++ b/maitake/src/wait.rs @@ -1,16 +1,19 @@ //! Waiter cells and queues to allow tasks to wait for notifications. //! //! This module implements two types of structure for waiting: a [`WaitCell`], -//! which stores a *single* waiting task, and a wait *queue*, which +//! which stores a *single* waiting task, and a [`WaitQueue`], which //! stores a queue of waiting tasks. pub(crate) mod cell; -mod queue; -pub use self::{cell::WaitCell, queue::WaitQueue}; +pub mod queue; + +pub use self::cell::WaitCell; +#[doc(inline)] +pub use self::queue::WaitQueue; use core::task::Poll; -/// An error indicating that a [`WaitCell`] or queue was closed while attempting -/// register a waiter. +/// An error indicating that a [`WaitCell`] or [`WaitQueue`] was closed while +/// attempting register a waiter. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct Closed(()); diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index f49bcbfb..ee19a08a 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -33,7 +33,22 @@ use pin_project::{pin_project, pinned_drop}; #[cfg(test)] mod tests; -/// A queue of [`Waker`]s implemented using an [intrusive doubly-linked list][ilist]. +/// A queue of [`Waker`]s implemented using an [intrusive doubly-linked +/// list][ilist]. +/// +/// A `WaitQueue` allows any number of tasks to [wait] asynchronously and be +/// woken when some event occurs, either [individually][wake] in first-in, +/// first-out order, or [all at once][wake_all]. This makes it a vital building +/// block of runtime services (such as timers or I/O resources), where it may be +/// used to wake a set of tasks when a timer completes or when a resource +/// becomes available. It can be equally useful for implementing higher-level +/// synchronization primitives: for example, a `WaitQueue` plus an +/// [`UnsafeCell`] is essentially an entire implementation of a fair +/// asynchronous mutex. Finally, a `WaitQueue` can be a useful synchronization +/// primitive on its own: sometimes, you just need to have a bunch of tasks wait +/// for something and then wake them all up. +/// +/// # Implementation Notes /// /// The *[intrusive]* aspect of this list is important, as it means that it does /// not allocate memory. Instead, nodes in the linked list are stored in the @@ -58,6 +73,10 @@ mod tests; /// mutex are short enough that we still get pretty good performance despite it. /// /// [`Waker`]: core::task::Waker +/// [wait]: WaitQueue::wait +/// [wake]: WaitQueue::wake +/// [wake_all]: WaitQueue::wake_all +/// [`UnsafeCell`]: core::cell::UnsafeCell /// [ilist]: cordyceps::List /// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction /// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue @@ -73,28 +92,30 @@ pub struct WaitQueue { /// This is protected by a mutex; the mutex *must* be acquired when /// manipulating the linked list, OR when manipulating waiter nodes that may /// be linked into the list. If a node is known to not be linked, it is safe - /// to modify that node (such as by setting or unsetting its - /// `Waker`/`Thread`) without holding the lock; otherwise, it may be - /// modified through the list, so the lock must be held when modifying the + /// to modify that node (such as by waking the stored [`Waker`]) without + /// holding the lock; otherwise, it may be modified through the list, so the + /// lock must be held when modifying the /// node. /// - /// A spinlock is used on `no_std` platforms; [`std::sync::Mutex`] or - /// `parking_lot::Mutex` are used when the standard library is available - /// (depending on feature flags). + /// A spinlock (from `mycelium_util`) is used here, in order to support + /// `no_std` platforms; when running `loom` tests, a `loom` mutex is used + /// instead to simulate the spinlock, because loom doesn't play nice with + /// real spinlocks. queue: Mutex>, } /// Future returned from [`WaitQueue::wait()`]. /// /// This future is fused, so once it has completed, any future calls to poll -/// will immediately return `Poll::Ready`. +/// will immediately return [`Poll::Ready`]. #[derive(Debug)] #[pin_project(PinnedDrop)] +#[must_use = "futures do nothing unless `.await`ed or `poll`ed"] pub struct Wait<'a> { - /// The `WaitQueue` being waited on from. + /// The [`WaitQueue`] being waited on from. queue: &'a WaitQueue, - /// Entry in the wait queue. + /// Entry in the wait queue linked list. #[pin] waiter: Waiter, } @@ -220,7 +241,8 @@ enum Wakeup { // === impl WaitQueue === impl WaitQueue { - + /// Returns a new `WaitQueue`. + #[must_use] #[cfg(not(loom))] pub const fn new() -> Self { Self { @@ -229,6 +251,8 @@ impl WaitQueue { } } + /// Returns a new `WaitQueue`. + #[must_use] #[cfg(loom)] pub fn new() -> Self { Self { @@ -237,20 +261,35 @@ impl WaitQueue { } } - + /// Wake the next task in the queue. + /// + /// If the queue is empty, a wakeup is stored in the `WaitQueue`, and the + /// next call to [`Wait`] will complete immediately. + #[inline] pub fn wake(&self) { + // snapshot the queue's current state. let mut state = self.load(); + + // check if any tasks are currently waiting on this queue. if there are + // no waiting tasks, store the wakeup to be consumed by the next call to + // `wait`. while state.get(QueueState::STATE) != State::Waiting { let next = state.with_state(State::Woken); + // advance the state to `Woken`, and return (if we did so + // successfully) match self.compare_exchange(state, next) { Ok(_) => return, Err(actual) => state = actual, } } + // okay, there are tasks waiting on the queue; we must acquire the lock + // on the linked list and wake the next task from the queue. let mut queue = self.queue.lock(); - test_trace!("wake: -> locked"); + + // the queue's state may have changed while we were waiting to acquire + // the lock, so we need to acquire a new snapshot. state = self.load(); if let Some(waker) = self.wake_locked(&mut *queue, state) { @@ -258,6 +297,7 @@ impl WaitQueue { } } + /// Wake *all* tasks currently in the queue. pub fn wake_all(&self) { let mut queue = self.queue.lock(); let state = self.load(); @@ -285,6 +325,14 @@ impl WaitQueue { } + /// Wait to be woken up by this queue. + /// + /// This returns a [`Wait`] future that will complete when the task is + /// woken by a call to [`wake`] or [`wake_all`], or when the `WaitQueue` is + /// dropped. + /// + /// [`wake`]: Self::wake + /// [`wake_all`]: Self::wake_all pub fn wait(&self) -> Wait<'_> { Wait { queue: self, @@ -292,7 +340,12 @@ impl WaitQueue { } } + /// Returns a [`Waiter`] entry in this queue. + /// + /// This is factored out into a separate function because it's used by both + /// [`WaitQueue::wait`] and [`WaitQueue::wait_owned`]. fn waiter(&self) -> Waiter { + // how many times has `wake_all` been called when this waiter is created? let current_wake_alls = test_dbg!(self.load().get(QueueState::WAKE_ALLS)); Waiter { state: WaitState::Start(current_wake_alls), @@ -331,8 +384,12 @@ impl WaitQueue { #[inline(never)] fn wake_locked(&self, queue: &mut List, curr: QueueState) -> Option { let state = curr.get(QueueState::STATE); - if test_dbg!(state) != State::Waiting { + // is the queue still in the `Waiting` state? it is possible that we + // transitioned to a different state while locking the queue. + if test_dbg!(state) != State::Waiting { + // if there are no longer any queued tasks, try to store the + // wakeup in the queue and bail. if let Err(actual) = self.compare_exchange(curr, curr.with_state(State::Waiting)) { debug_assert!(actual.get(QueueState::STATE) != State::Waiting); self.store(actual.with_state(State::Woken)); @@ -341,13 +398,14 @@ impl WaitQueue { return None; } - + // otherwise, we have to dequeue a task and wake it. let node = queue.pop_back() .expect("if we are in the Waiting state, there must be waiters in the queue"); let waker = Waiter::wake(node, queue, Wakeup::One); + // if we took the final waiter currently in the queue, transition to the + // `Empty` state. if test_dbg!(queue.is_empty()) { - // we have taken the final waiter from the queue self.store(curr.with_state(State::Empty)); } @@ -372,11 +430,16 @@ impl Drop for WaitQueue { // === impl Waiter === impl Waiter { + /// Wake the task that owns this `Waiter`. + /// /// # Safety /// /// This is only safe to call while the list is locked. The dummy `_list` /// parameter ensures this method is only called while holding the lock, so /// this can be safe. + /// + /// Of course, that must be the *same* list that this waiter is a member of, + /// and currently, there is no way to ensure that... #[inline(always)] #[cfg_attr(loom, track_caller)] fn wake(mut this: NonNull, _list: &mut List, wakeup: Wakeup) -> Waker { @@ -492,6 +555,10 @@ impl Waiter { } + /// Release this `Waiter` from the queue. + /// + /// This is called from the `drop` implementation for the [`Wait`] and + /// [`WaitOwned`] futures. fn release(mut self: Pin<&mut Self>, queue: &WaitQueue) { let state = *(self.as_mut().project().state); let ptr = NonNull::from(unsafe { @@ -552,12 +619,16 @@ feature!{ /// Future returned from [`WaitQueue::wait_owned()`]. /// - /// This is identical to the [`Wait`] future, except that it takes the - /// [`WaitQueue`] as an [`Arc`] clone, allowing it to live for the `'static` - /// lifetime. + /// This is identical to the [`Wait`] future, except that it takes a + /// [`Weak`] reference to the [`WaitQueue`], allowing the returned future to + /// live for the `'static` lifetime. + /// + /// A `WaitOwned` future does *not* keep the [`WaitQueue`] alive; if all + /// [`Arc`] clones of the [`WaitQueue`] that this future was returned by are + /// dropped, this future will be cancelled. /// /// This future is fused, so once it has completed, any future calls to poll - /// will immediately return `Poll::Ready`. + /// will immediately return [`Poll::Ready`]. #[derive(Debug)] #[pin_project(PinnedDrop)] pub struct WaitOwned { @@ -570,6 +641,24 @@ feature!{ } impl WaitQueue { + /// Wait to be woken up by this queue, returning a future that's valid + /// for the `'static` lifetime. + /// + /// This returns a [`WaitOwned`] future that will complete when the task is + /// woken by a call to [`wake`] or [`wake_all`], or when the `WaitQueue` is + /// dropped. + /// + /// This is identical to the [`wait`] method, except that it takes a + /// [`Weak`] reference to the [`WaitQueue`], allowing the returned future to + /// live for the `'static` lifetime. + /// + /// A `WaitOwned` future does *not* keep the [`WaitQueue`] alive; if all + /// [`Arc`] clones of the [`WaitQueue`] that this future was returned by are + /// dropped, this future will be cancelled. + /// + /// [`wake`]: Self::wake + /// [`wake_all`]: Self::wake_all + /// [`wait`]: Self::wait pub fn wait_owned(self: &Arc) -> WaitOwned { let waiter = self.waiter(); let queue = Arc::downgrade(self); From f1162f88bb8a8e224f4ea3a1f07eb17589a10097 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 3 Jun 2022 17:02:34 -0700 Subject: [PATCH 11/24] more docs Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index ee19a08a..0177687b 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -121,10 +121,31 @@ pub struct Wait<'a> { } +/// The state of a [`Waiter`] node in a [`WaitQueue`]. #[derive(Debug, Copy, Clone, Eq, PartialEq)] enum WaitState { + /// The waiter has not yet been enqueued. + /// + /// The number of times [`WaitQueue::wake_all`] has been called is stored + /// when the node is created, in order to determine whether it was woken by + /// a stored wakeup when enqueueing. + /// + /// When in this state, the node is **not** part of the linked list, and + /// can be dropped without removing it from the list. Start(usize), + + /// The waiter is waiting. + /// + /// When in this state, the node **is** part of the linked list. If the + /// node is dropped in this state, it **must** be removed from the list + /// before dropping it. Failure to ensure this will result in dangling + /// pointers in the linked list! Waiting, + + /// The waiter has been woken. + /// + /// When in this state, the node is **not** part of the linked list, and + /// can be dropped without removing it from the list. Woken, } @@ -169,34 +190,41 @@ struct Node { bitfield! { #[derive(Eq, PartialEq)] struct QueueState { + /// The queue's state. const STATE: State; + + /// The number of times [`WaitQueue::wake_all`] has been called. const WAKE_ALLS = ..; } } + +/// The queue's current state. #[derive(Debug, Copy, Clone, Eq, PartialEq)] #[repr(u8)] enum State { /// No waiters are queued, and there is no pending notification. /// Waiting while the queue is in this state will enqueue the waiter; /// notifying while in this state will store a pending notification in the - /// queue, transitioning to the [`Woken`] state. + /// queue, transitioning to [`State::Woken`]. Empty = 0, /// There are one or more waiters in the queue. Waiting while /// the queue is in this state will not transition the state. Waking while /// in this state will wake the first waiter in the queue; if this empties - /// the queue, then the queue will transition to the [`Empty`] state. + /// the queue, then the queue will transition to [`State::Empty`]. Waiting = 1, /// The queue has a stored notification. Waiting while the queue /// is in this state will consume the pending notification *without* - /// enqueueing the waiter and transition the queue to the [`Empty`] state. + /// enqueueing the waiter and transition the queue to [`State::Empty`]. /// Waking while in this state will leave the queue in this state. Woken = 2, /// The queue is closed. Waiting while in this state will return /// [`Closed`] without transitioning the queue's state. + /// + /// [`Closed`]: crate::wait::Closed Closed = 3, } @@ -486,14 +514,14 @@ impl Waiter { // transition the queue to the waiting state 'to_waiting: loop { match test_dbg!(queue_state.get(QueueState::STATE)) { - // the queue is EMPTY, transition to WAITING + // the queue is `Empty`, transition to `Waiting` State::Empty => { match queue.compare_exchange(queue_state, queue_state.with_state(State::Waiting)) { Ok(_) => break 'to_waiting, Err(actual) => queue_state = actual, } }, - // the queue is already WAITING + // the queue is already `Waiting` State::Waiting => break 'to_waiting, // the queue was woken, consume the wakeup. State::Woken => { From 1d094bff6a4add81a1f29501eb845f5740615e82 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 3 Jun 2022 17:10:03 -0700 Subject: [PATCH 12/24] rustfmt --- maitake/src/loom.rs | 2 +- maitake/src/wait/queue.rs | 93 +++++++++++++++++++-------------- maitake/src/wait/queue/tests.rs | 16 +++--- 3 files changed, 66 insertions(+), 45 deletions(-) diff --git a/maitake/src/loom.rs b/maitake/src/loom.rs index 79b39ae8..05b2d99f 100644 --- a/maitake/src/loom.rs +++ b/maitake/src/loom.rs @@ -4,7 +4,7 @@ pub(crate) use self::inner::*; #[cfg(loom)] mod inner { #![allow(dead_code)] - + #[cfg(feature = "alloc")] pub(crate) use loom::alloc; pub(crate) use loom::{cell, future, hint, model, thread}; diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 0177687b..f92158f7 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -2,12 +2,9 @@ use crate::{ loom::{ cell::UnsafeCell, sync::{ + atomic::{AtomicUsize, Ordering::*}, spin::Mutex, - atomic::{ - AtomicUsize, - Ordering::*, }, - } }, util, wait::{self, WaitResult}, @@ -19,15 +16,15 @@ use cordyceps::{ use core::{ future::Future, marker::PhantomPinned, + mem, pin::Pin, ptr::NonNull, task::{Context, Poll, Waker}, - mem, }; -use mycelium_bitfield::{FromBits, bitfield}; -use mycelium_util::sync::CachePadded; +use mycelium_bitfield::{bitfield, FromBits}; #[cfg(test)] use mycelium_util::fmt; +use mycelium_util::sync::CachePadded; use pin_project::{pin_project, pinned_drop}; #[cfg(test)] @@ -120,7 +117,6 @@ pub struct Wait<'a> { waiter: Waiter, } - /// The state of a [`Waiter`] node in a [`WaitQueue`]. #[derive(Debug, Copy, Clone, Eq, PartialEq)] enum WaitState { @@ -198,7 +194,6 @@ bitfield! { } } - /// The queue's current state. #[derive(Debug, Copy, Clone, Eq, PartialEq)] #[repr(u8)] @@ -226,7 +221,6 @@ enum State { /// /// [`Closed`]: crate::wait::Closed Closed = 3, - } impl QueueState { @@ -247,7 +241,11 @@ impl FromBits for State { bits if bits == Self::Waiting as u8 => Self::Waiting, bits if bits == Self::Woken as u8 => Self::Woken, bits if bits == Self::Closed as u8 => Self::Closed, - _ => unsafe { mycelium_util::unreachable_unchecked!("all potential 2-bit patterns should be covered!") }, + _ => unsafe { + mycelium_util::unreachable_unchecked!( + "all potential 2-bit patterns should be covered!" + ) + }, }) } @@ -265,7 +263,6 @@ enum Wakeup { Closed, } - // === impl WaitQueue === impl WaitQueue { @@ -348,11 +345,13 @@ impl WaitQueue { // now that the queue has been drained, transition to the empty state, // and increment the wake_all count. - let next_state = QueueState::new().with_state(State::Empty).with(QueueState::WAKE_ALLS, state.get(QueueState::WAKE_ALLS) + 1); - self.compare_exchange(state, next_state).expect("state should not have transitioned while locked"); + let next_state = QueueState::new() + .with_state(State::Empty) + .with(QueueState::WAKE_ALLS, state.get(QueueState::WAKE_ALLS) + 1); + self.compare_exchange(state, next_state) + .expect("state should not have transitioned while locked"); } - /// Wait to be woken up by this queue. /// /// This returns a [`Wait`] future that will complete when the task is @@ -364,7 +363,7 @@ impl WaitQueue { pub fn wait(&self) -> Wait<'_> { Wait { queue: self, - waiter: self.waiter() + waiter: self.waiter(), } } @@ -385,7 +384,6 @@ impl WaitQueue { } } - #[cfg_attr(test, track_caller)] fn load(&self) -> QueueState { #[allow(clippy::let_and_return)] @@ -401,9 +399,17 @@ impl WaitQueue { } #[cfg_attr(test, track_caller)] - fn compare_exchange(&self, current: QueueState, new: QueueState) -> Result { + fn compare_exchange( + &self, + current: QueueState, + new: QueueState, + ) -> Result { #[allow(clippy::let_and_return)] - let res = self.state.compare_exchange(current.0, new.0, SeqCst, SeqCst).map(QueueState::from_bits).map_err(QueueState::from_bits); + let res = self + .state + .compare_exchange(current.0, new.0, SeqCst, SeqCst) + .map(QueueState::from_bits) + .map_err(QueueState::from_bits); test_trace!("state.compare_exchange({current:?}, {new:?}) = {res:?}"); res } @@ -427,7 +433,8 @@ impl WaitQueue { } // otherwise, we have to dequeue a task and wake it. - let node = queue.pop_back() + let node = queue + .pop_back() .expect("if we are in the Waiting state, there must be waiters in the queue"); let waker = Waiter::wake(node, queue, Wakeup::One); @@ -484,7 +491,11 @@ impl Waiter { } } - fn poll_wait(mut self: Pin<&mut Self>, queue: &WaitQueue, cx: &mut Context<'_>) -> Poll { + fn poll_wait( + mut self: Pin<&mut Self>, + queue: &WaitQueue, + cx: &mut Context<'_>, + ) -> Poll { test_trace!(ptr = ?fmt::ptr(self.as_mut()), "Waiter::poll_wait"); let mut this = self.as_mut().project(); @@ -493,7 +504,13 @@ impl Waiter { let mut queue_state = queue.load(); // can we consume a pending wakeup? - if queue.compare_exchange(queue_state.with_state(State::Woken), queue_state.with_state(State::Empty)).is_ok() { + if queue + .compare_exchange( + queue_state.with_state(State::Woken), + queue_state.with_state(State::Empty), + ) + .is_ok() + { *this.state = WaitState::Woken; return Poll::Ready(Ok(())); } @@ -516,16 +533,21 @@ impl Waiter { match test_dbg!(queue_state.get(QueueState::STATE)) { // the queue is `Empty`, transition to `Waiting` State::Empty => { - match queue.compare_exchange(queue_state, queue_state.with_state(State::Waiting)) { + match queue.compare_exchange( + queue_state, + queue_state.with_state(State::Waiting), + ) { Ok(_) => break 'to_waiting, Err(actual) => queue_state = actual, } - }, + } // the queue is already `Waiting` State::Waiting => break 'to_waiting, // the queue was woken, consume the wakeup. State::Woken => { - match queue.compare_exchange(queue_state, queue_state.with_state(State::Empty)) { + match queue + .compare_exchange(queue_state, queue_state.with_state(State::Empty)) + { Ok(_) => { *this.state = WaitState::Woken; return Poll::Ready(Ok(())); @@ -546,13 +568,11 @@ impl Waiter { (*node).waker = Wakeup::Waiting(cx.waker().clone()); } }); - let ptr = unsafe { - NonNull::from(Pin::into_inner_unchecked(self)) - }; + let ptr = unsafe { NonNull::from(Pin::into_inner_unchecked(self)) }; waiters.push_front(ptr); Poll::Pending - }, + } WaitState::Waiting => { let mut _waiters = queue.queue.lock(); this.node.with_mut(|node| unsafe { @@ -565,11 +585,11 @@ impl Waiter { *waker = cx.waker().clone(); } Poll::Pending - }, + } Wakeup::All | Wakeup::One => { *this.state = WaitState::Woken; Poll::Ready(Ok(())) - }, + } Wakeup::Closed => { *this.state = WaitState::Woken; wait::closed() @@ -577,10 +597,9 @@ impl Waiter { Wakeup::Empty => unreachable!(), } }) - }, + } WaitState::Woken => Poll::Ready(Ok(())), } - } /// Release this `Waiter` from the queue. @@ -589,9 +608,7 @@ impl Waiter { /// [`WaitOwned`] futures. fn release(mut self: Pin<&mut Self>, queue: &WaitQueue) { let state = *(self.as_mut().project().state); - let ptr = NonNull::from(unsafe { - Pin::into_inner_unchecked(self) - }); + let ptr = NonNull::from(unsafe { Pin::into_inner_unchecked(self) }); test_trace!(self = ?fmt::ptr(ptr), ?state, ?queue, "Waiter::release"); if state == WaitState::Waiting { unsafe { @@ -640,7 +657,7 @@ impl PinnedDrop for Wait<'_> { // === impl WaitOwned === -feature!{ +feature! { #![feature = "alloc"] use alloc::sync::{Arc, Weak}; @@ -723,4 +740,4 @@ feature!{ } } } -} \ No newline at end of file +} diff --git a/maitake/src/wait/queue/tests.rs b/maitake/src/wait/queue/tests.rs index ffbd8129..66d7bc4b 100644 --- a/maitake/src/wait/queue/tests.rs +++ b/maitake/src/wait/queue/tests.rs @@ -3,7 +3,7 @@ use super::*; #[cfg(loom)] mod loom { use super::*; - use crate::loom::{self, sync::Arc, future, thread}; + use crate::loom::{self, future, sync::Arc, thread}; #[test] fn wake_one() { @@ -49,14 +49,16 @@ mod loom { #[test] fn wake_all_concurrent() { use alloc::sync::Arc; - + loom::model(|| { let q = Arc::new(WaitQueue::new()); let wait1 = q.wait_owned(); let wait2 = q.wait_owned(); - let thread1 = thread::spawn(move || { future::block_on(wait1).expect("wait1 must not fail") }); - let thread2 = thread::spawn(move || { future::block_on(wait2).expect("wait2 must not fail") }); + let thread1 = + thread::spawn(move || future::block_on(wait1).expect("wait1 must not fail")); + let thread2 = + thread::spawn(move || future::block_on(wait2).expect("wait2 must not fail")); q.wake_all(); @@ -74,8 +76,10 @@ mod loom { let wait1 = q.wait_owned(); let wait2 = q.wait_owned(); - let thread1 = thread::spawn(move || { future::block_on(wait1).expect_err("wait1 must be canceled") }); - let thread2 = thread::spawn(move || { future::block_on(wait2).expect_err("wait2 must be canceled") }); + let thread1 = + thread::spawn(move || future::block_on(wait1).expect_err("wait1 must be canceled")); + let thread2 = + thread::spawn(move || future::block_on(wait2).expect_err("wait2 must be canceled")); drop(q); From a602de6ac7a86be1aebe11009289f7932faadf54 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 4 Jun 2022 12:30:50 -0700 Subject: [PATCH 13/24] add more tests Signed-off-by: Eliza Weisman --- maitake/src/wait/queue/tests.rs | 176 ++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) diff --git a/maitake/src/wait/queue/tests.rs b/maitake/src/wait/queue/tests.rs index 66d7bc4b..ac6e223e 100644 --- a/maitake/src/wait/queue/tests.rs +++ b/maitake/src/wait/queue/tests.rs @@ -1,5 +1,110 @@ use super::*; +#[cfg(all(not(loom), feature = "alloc"))] +mod alloc { + use super::*; + use crate::loom::sync::Arc; + use crate::scheduler::Scheduler; + use core::sync::atomic::{AtomicUsize, Ordering}; + + #[test] + fn wake_all() { + static COMPLETED: AtomicUsize = AtomicUsize::new(0); + + let scheduler = Scheduler::new(); + let q = Arc::new(WaitQueue::new()); + + const TASKS: usize = 10; + + for _ in 0..TASKS { + let q = q.clone(); + scheduler.spawn(async move { + q.wait().await.unwrap(); + COMPLETED.fetch_add(1, Ordering::SeqCst); + }); + } + + let tick = scheduler.tick(); + + assert_eq!(tick.completed, 0); + assert_eq!(COMPLETED.load(Ordering::SeqCst), 0); + assert!(!tick.has_remaining); + + q.wake_all(); + + let tick = scheduler.tick(); + + assert_eq!(tick.completed, TASKS); + assert_eq!(COMPLETED.load(Ordering::SeqCst), TASKS); + assert!(!tick.has_remaining); + } + + #[test] + fn close_on_drop() { + static COMPLETED: AtomicUsize = AtomicUsize::new(0); + + let scheduler = Scheduler::new(); + let q = Arc::new(WaitQueue::new()); + + const TASKS: usize = 10; + + for _ in 0..TASKS { + let wait = q.wait_owned(); + scheduler.spawn(async move { + wait.await.expect_err("dropping the queue must close it"); + COMPLETED.fetch_add(1, Ordering::SeqCst); + }); + } + + let tick = scheduler.tick(); + + assert_eq!(tick.completed, 0); + assert_eq!(COMPLETED.load(Ordering::SeqCst), 0); + assert!(!tick.has_remaining); + + drop(q); + + let tick = scheduler.tick(); + + assert_eq!(tick.completed, TASKS); + assert_eq!(COMPLETED.load(Ordering::SeqCst), TASKS); + assert!(!tick.has_remaining); + } + + #[test] + fn wake_one() { + static COMPLETED: AtomicUsize = AtomicUsize::new(0); + + let scheduler = Scheduler::new(); + let q = Arc::new(WaitQueue::new()); + + const TASKS: usize = 10; + + for _ in 0..TASKS { + let q = q.clone(); + scheduler.spawn(async move { + q.wait().await.unwrap(); + COMPLETED.fetch_add(1, Ordering::SeqCst); + q.wake(); + }); + } + + let tick = scheduler.tick(); + + assert_eq!(tick.completed, 0); + assert_eq!(COMPLETED.load(Ordering::SeqCst), 0); + assert!(!tick.has_remaining); + + q.wake(); + + let tick = scheduler.tick(); + + assert_eq!(tick.completed, TASKS); + assert_eq!(COMPLETED.load(Ordering::SeqCst), TASKS); + assert!(!tick.has_remaining); + } +} + #[cfg(loom)] mod loom { use super::*; @@ -116,4 +221,75 @@ mod loom { }); }); } + + #[test] + fn wake_mixed() { + loom::model(|| { + let q = Arc::new(WaitQueue::new()); + + let thread1 = thread::spawn({ + let q = q.clone(); + move || { + q.wake_all(); + } + }); + + let thread2 = thread::spawn({ + let q = q.clone(); + move || { + q.wake(); + } + }); + + let thread3 = thread::spawn(move || { + future::block_on(q.wait()).unwrap(); + }); + + thread1.join().unwrap(); + thread2.join().unwrap(); + thread3.join().unwrap(); + }); + } + + #[test] + fn drop_wait_future() { + use futures::future::poll_fn; + use std::future::Future; + use std::task::Poll; + + loom::model(|| { + let q = Arc::new(WaitQueue::new()); + + let thread1 = thread::spawn({ + let q = q.clone(); + move || { + let mut wait = Box::pin(q.wait()); + + future::block_on(poll_fn(|cx| { + if wait.as_mut().poll(cx).is_ready() { + q.wake(); + } + Poll::Ready(()) + })); + } + }); + + let thread2 = thread::spawn({ + let q = q.clone(); + move || { + block_on(async { + q.wait().await.unwrap(); + // Trigger second notification + q.wake(); + q.wait().await.unwrap(); + }); + } + }); + + q.wake(); + + thread1.join().unwrap(); + thread2.join().unwrap(); + }); + } } From 642383585db53f06062907c29b1f89cd0b7a186c Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 4 Jun 2022 12:38:24 -0700 Subject: [PATCH 14/24] add tracing to tests Signed-off-by: Eliza Weisman --- Cargo.lock | 1 + maitake/Cargo.toml | 4 ++++ maitake/src/scheduler/tests.rs | 14 ++++++++++++++ maitake/src/util.rs | 9 +++++++++ maitake/src/wait/queue/tests.rs | 3 +++ 5 files changed, 31 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index d937fb7c..1f53f99b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -501,6 +501,7 @@ dependencies = [ "pin-project", "tracing 0.1.34", "tracing 0.2.0", + "tracing-subscriber 0.3.0", "tracing-subscriber 0.3.11", ] diff --git a/maitake/Cargo.toml b/maitake/Cargo.toml index 8f881a71..c0aa4504 100644 --- a/maitake/Cargo.toml +++ b/maitake/Cargo.toml @@ -48,6 +48,10 @@ git = "https://github.com/tokio-rs/tracing" [dev-dependencies] futures-util = "0.3" + +[target.'cfg(not(loom))'.dev-dependencies] +tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", features = ["ansi", "fmt"] } + [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.5.5", features = ["futures", "checkpoint"] } tracing_01 = { package = "tracing", version = "0.1", default_features = false } diff --git a/maitake/src/scheduler/tests.rs b/maitake/src/scheduler/tests.rs index 0ebbb3bb..5804e69d 100644 --- a/maitake/src/scheduler/tests.rs +++ b/maitake/src/scheduler/tests.rs @@ -49,6 +49,8 @@ mod alloc { static SCHEDULER: Lazy = Lazy::new(StaticScheduler::new); static IT_WORKED: AtomicBool = AtomicBool::new(false); + crate::util::trace_init(); + SCHEDULER.spawn(async { Yield::once().await; IT_WORKED.store(true, Ordering::Release); @@ -69,6 +71,7 @@ mod alloc { const TASKS: usize = 10; + crate::util::trace_init(); for _ in 0..TASKS { SCHEDULER.spawn(async { Yield::once().await; @@ -89,6 +92,7 @@ mod alloc { static SCHEDULER: Lazy = Lazy::new(StaticScheduler::new); static COMPLETED: AtomicUsize = AtomicUsize::new(0); + crate::util::trace_init(); let chan = Chan::new(1); SCHEDULER.spawn({ @@ -114,6 +118,7 @@ mod alloc { static SCHEDULER: Lazy = Lazy::new(StaticScheduler::new); static COMPLETED: AtomicUsize = AtomicUsize::new(0); + crate::util::trace_init(); let chan = Chan::new(1); SCHEDULER.spawn({ @@ -142,6 +147,8 @@ mod alloc { const TASKS: usize = 10; + crate::util::trace_init(); + for i in 0..TASKS { SCHEDULER.spawn(async { Yield::new(i).await; @@ -196,6 +203,8 @@ mod custom_storage { static SCHEDULER: StaticScheduler = unsafe { StaticScheduler::new_with_static_stub(&STUB) }; static IT_WORKED: AtomicBool = AtomicBool::new(false); + crate::util::trace_init(); + MyBoxTask::spawn(&SCHEDULER, async { Yield::once().await; IT_WORKED.store(true, Ordering::Release); @@ -217,6 +226,8 @@ mod custom_storage { const TASKS: usize = 10; + crate::util::trace_init(); + for _ in 0..TASKS { MyBoxTask::spawn(&SCHEDULER, async { Yield::once().await; @@ -238,6 +249,7 @@ mod custom_storage { static SCHEDULER: StaticScheduler = unsafe { StaticScheduler::new_with_static_stub(&STUB) }; static COMPLETED: AtomicUsize = AtomicUsize::new(0); + crate::util::trace_init(); let chan = Chan::new(1); MyBoxTask::spawn(&SCHEDULER, { @@ -264,6 +276,7 @@ mod custom_storage { static SCHEDULER: StaticScheduler = unsafe { StaticScheduler::new_with_static_stub(&STUB) }; static COMPLETED: AtomicUsize = AtomicUsize::new(0); + crate::util::trace_init(); let chan = Chan::new(1); MyBoxTask::spawn(&SCHEDULER, { @@ -291,6 +304,7 @@ mod custom_storage { static SCHEDULER: StaticScheduler = unsafe { StaticScheduler::new_with_static_stub(&STUB) }; static COMPLETED: AtomicUsize = AtomicUsize::new(0); + crate::util::trace_init(); const TASKS: usize = 10; for i in 0..TASKS { diff --git a/maitake/src/util.rs b/maitake/src/util.rs index 928efcd7..02a31dd1 100644 --- a/maitake/src/util.rs +++ b/maitake/src/util.rs @@ -95,3 +95,12 @@ pub(crate) unsafe fn non_null(ptr: *mut T) -> NonNull { unsafe fn non_null(ptr: *mut T) -> NonNull { NonNull::new_unchecked(ptr) } + +#[cfg(all(test, not(loom)))] +pub(crate) fn trace_init() { + use tracing_subscriber::filter::LevelFilter; + let _ = tracing_subscriber::fmt() + .with_max_level(LevelFilter::TRACE) + .with_test_writer() + .try_init(); +} diff --git a/maitake/src/wait/queue/tests.rs b/maitake/src/wait/queue/tests.rs index ac6e223e..c692f6f4 100644 --- a/maitake/src/wait/queue/tests.rs +++ b/maitake/src/wait/queue/tests.rs @@ -9,6 +9,7 @@ mod alloc { #[test] fn wake_all() { + crate::util::trace_init(); static COMPLETED: AtomicUsize = AtomicUsize::new(0); let scheduler = Scheduler::new(); @@ -41,6 +42,7 @@ mod alloc { #[test] fn close_on_drop() { + crate::util::trace_init(); static COMPLETED: AtomicUsize = AtomicUsize::new(0); let scheduler = Scheduler::new(); @@ -73,6 +75,7 @@ mod alloc { #[test] fn wake_one() { + crate::util::trace_init(); static COMPLETED: AtomicUsize = AtomicUsize::new(0); let scheduler = Scheduler::new(); From 35bce75b8b12222fb00e35551e46307594b478cd Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 4 Jun 2022 14:20:19 -0700 Subject: [PATCH 15/24] queues will have to be closed explicitly Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 87 +++++++++++++++++---------------- maitake/src/wait/queue/tests.rs | 8 +-- 2 files changed, 48 insertions(+), 47 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index f92158f7..8f835b11 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -298,7 +298,15 @@ impl WaitQueue { // check if any tasks are currently waiting on this queue. if there are // no waiting tasks, store the wakeup to be consumed by the next call to // `wait`. - while state.get(QueueState::STATE) != State::Waiting { + loop { + match state.get(QueueState::STATE) { + // if the queue is closed, bail. + State::Closed => return, + // if there are waiting tasks, break out of the loop and wake one. + State::Waiting => break, + _ => {} + } + let next = state.with_state(State::Woken); // advance the state to `Woken`, and return (if we did so // successfully) @@ -327,11 +335,17 @@ impl WaitQueue { let mut queue = self.queue.lock(); let state = self.load(); - // if there are no waiters in the queue, increment the number of - // `wake_all` calls and return. - if state.get(QueueState::STATE) != State::Waiting { - self.state.fetch_add(QueueState::ONE_WAKE_ALL, SeqCst); - return; + match state.get(QueueState::STATE) { + // if the queue is closed, bail. + State::Closed => return, + + // if there are no waiters in the queue, increment the number of + // `wake_all` calls and return. + State::Woken | State::Empty => { + self.state.fetch_add(QueueState::ONE_WAKE_ALL, SeqCst); + return; + } + State::Waiting => {} } // okay, we actually have to wake some stuff. @@ -352,6 +366,22 @@ impl WaitQueue { .expect("state should not have transitioned while locked"); } + /// Close the queue, indicating that it may no longer be used. + /// + /// Once a queue is closed, all [`wait`] calls (current or future) will + /// return an error. + pub fn close(&self) { + let mut queue = self.queue.lock(); + test_dbg!(self.state.fetch_or(State::Closed as u8 as usize, SeqCst)); + + // TODO(eliza): wake outside the lock using an array, a la + // https://github.com/tokio-rs/tokio/blob/4941fbf7c43566a8f491c64af5a4cd627c99e5a6/tokio/src/sync/batch_semaphore.rs#L277-L303 + while let Some(node) = queue.pop_back() { + let waker = Waiter::wake(node, &mut queue, Wakeup::Closed); + waker.wake() + } + } + /// Wait to be woken up by this queue. /// /// This returns a [`Wait`] future that will complete when the task is @@ -450,15 +480,7 @@ impl WaitQueue { impl Drop for WaitQueue { fn drop(&mut self) { - let mut queue = self.queue.lock(); - test_dbg!(self.state.fetch_or(State::Closed as u8 as usize, SeqCst)); - - // TODO(eliza): wake outside the lock using an array, a la - // https://github.com/tokio-rs/tokio/blob/4941fbf7c43566a8f491c64af5a4cd627c99e5a6/tokio/src/sync/batch_semaphore.rs#L277-L303 - while let Some(node) = queue.pop_back() { - let waker = Waiter::wake(node, &mut queue, Wakeup::Closed); - waker.wake() - } + self.close(); } } @@ -660,7 +682,7 @@ impl PinnedDrop for Wait<'_> { feature! { #![feature = "alloc"] - use alloc::sync::{Arc, Weak}; + use alloc::sync::Arc; /// Future returned from [`WaitQueue::wait_owned()`]. /// @@ -668,17 +690,13 @@ feature! { /// [`Weak`] reference to the [`WaitQueue`], allowing the returned future to /// live for the `'static` lifetime. /// - /// A `WaitOwned` future does *not* keep the [`WaitQueue`] alive; if all - /// [`Arc`] clones of the [`WaitQueue`] that this future was returned by are - /// dropped, this future will be cancelled. - /// /// This future is fused, so once it has completed, any future calls to poll /// will immediately return [`Poll::Ready`]. #[derive(Debug)] #[pin_project(PinnedDrop)] pub struct WaitOwned { - /// The `WaitQueue` being waited on from. - queue: Weak, + /// The `WaitQueue` being waited on. + queue: Arc, /// Entry in the wait queue. #[pin] @@ -694,19 +712,15 @@ feature! { /// dropped. /// /// This is identical to the [`wait`] method, except that it takes a - /// [`Weak`] reference to the [`WaitQueue`], allowing the returned future to + /// [`Arc`] reference to the [`WaitQueue`], allowing the returned future to /// live for the `'static` lifetime. /// - /// A `WaitOwned` future does *not* keep the [`WaitQueue`] alive; if all - /// [`Arc`] clones of the [`WaitQueue`] that this future was returned by are - /// dropped, this future will be cancelled. - /// /// [`wake`]: Self::wake /// [`wake_all`]: Self::wake_all /// [`wait`]: Self::wait pub fn wait_owned(self: &Arc) -> WaitOwned { let waiter = self.waiter(); - let queue = Arc::downgrade(self); + let queue = self.clone(); WaitOwned { queue, waiter } } } @@ -716,10 +730,7 @@ feature! { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - match this.queue.upgrade() { - Some(queue) => this.waiter.poll_wait(&*queue, cx), - None => wait::closed(), - } + this.waiter.poll_wait(&*this.queue, cx) } } @@ -727,17 +738,7 @@ feature! { impl PinnedDrop for WaitOwned { fn drop(mut self: Pin<&mut Self>) { let this = self.project(); - match this.queue.upgrade() { - Some(ref queue) => this.waiter.release(queue), - None => { - test_trace!("WaitOwned::drop: queue already dropped"); - debug_assert_ne!( - *this.waiter.project().state, - WaitState::Waiting, - "if the queue has been dropped, the waiter should have been notified!", - ) - } - } + this.waiter.release(&*this.queue); } } } diff --git a/maitake/src/wait/queue/tests.rs b/maitake/src/wait/queue/tests.rs index c692f6f4..544016d0 100644 --- a/maitake/src/wait/queue/tests.rs +++ b/maitake/src/wait/queue/tests.rs @@ -41,7 +41,7 @@ mod alloc { } #[test] - fn close_on_drop() { + fn close() { crate::util::trace_init(); static COMPLETED: AtomicUsize = AtomicUsize::new(0); @@ -64,7 +64,7 @@ mod alloc { assert_eq!(COMPLETED.load(Ordering::SeqCst), 0); assert!(!tick.has_remaining); - drop(q); + q.close(); let tick = scheduler.tick(); @@ -176,7 +176,7 @@ mod loom { } #[test] - fn wake_on_drop() { + fn wake_close() { use alloc::sync::Arc; loom::model(|| { @@ -189,7 +189,7 @@ mod loom { let thread2 = thread::spawn(move || future::block_on(wait2).expect_err("wait2 must be canceled")); - drop(q); + q.close(); thread1.join().unwrap(); thread2.join().unwrap(); From 87b95f026b557bda21afad8ba80283d8ebd83b19 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 4 Jun 2022 14:21:14 -0700 Subject: [PATCH 16/24] fix deadlock when dropping a future with a wakeup Signed-off-by: Eliza Weisman --- bin/loom | 2 +- maitake/Cargo.toml | 1 - maitake/src/wait/queue.rs | 69 +++++++++++++++++++++++++++------ maitake/src/wait/queue/tests.rs | 4 +- 4 files changed, 60 insertions(+), 16 deletions(-) diff --git a/bin/loom b/bin/loom index eb6ea345..0867cf85 100755 --- a/bin/loom +++ b/bin/loom @@ -3,7 +3,7 @@ set -x RUSTFLAGS="--cfg loom ${RUSTFLAGS}" \ -LOOM_LOG="${LOOM_LOG:-info}" \ +LOOM_LOG="${LOOM_LOG:-debug}" \ LOOM_LOCATION=true \ cargo test \ --profile loom \ diff --git a/maitake/Cargo.toml b/maitake/Cargo.toml index c0aa4504..27137df3 100644 --- a/maitake/Cargo.toml +++ b/maitake/Cargo.toml @@ -48,7 +48,6 @@ git = "https://github.com/tokio-rs/tracing" [dev-dependencies] futures-util = "0.3" - [target.'cfg(not(loom))'.dev-dependencies] tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", features = ["ansi", "fmt"] } diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 8f835b11..036ee447 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -370,6 +370,10 @@ impl WaitQueue { /// /// Once a queue is closed, all [`wait`] calls (current or future) will /// return an error. + /// + /// This method is generally used when implementing higher-level + /// synchronization primitives or resources: when an event makes a resource + /// permanently unavailable, the queue can be closed. pub fn close(&self) { let mut queue = self.queue.lock(); test_dbg!(self.state.fetch_or(State::Closed as u8 as usize, SeqCst)); @@ -499,17 +503,35 @@ impl Waiter { /// and currently, there is no way to ensure that... #[inline(always)] #[cfg_attr(loom, track_caller)] - fn wake(mut this: NonNull, _list: &mut List, wakeup: Wakeup) -> Waker { + fn wake(this: NonNull, list: &mut List, wakeup: Wakeup) -> Waker { + Waiter::with_node(this, list, |node| { + let waker = test_dbg!(mem::replace(&mut node.waker, wakeup)); + match waker { + Wakeup::Waiting(waker) => waker, + _ => unreachable!("tried to wake a waiter in the {:?} state!", waker), + } + }) + } + + /// # Safety + /// + /// This is only safe to call while the list is locked. The dummy `_list` + /// parameter ensures this method is only called while holding the lock, so + /// this can be safe. + /// + /// Of course, that must be the *same* list that this waiter is a member of, + /// and currently, there is no way to ensure that... + #[inline(always)] + #[cfg_attr(loom, track_caller)] + fn with_node( + mut this: NonNull, + _list: &mut List, + f: impl FnOnce(&mut Node) -> T, + ) -> T { unsafe { // safety: this is only called while holding the lock on the queue, // so it's safe to mutate the waiter. - this.as_mut().node.with_mut(|node| { - let waker = test_dbg!(mem::replace(&mut (*node).waker, wakeup)); - match waker { - Wakeup::Waiting(waker) => waker, - _ => unreachable!("tried to wake a waiter in the {:?} state!", waker), - } - }) + this.as_mut().node.with_mut(|node| f(&mut *node)) } } @@ -541,7 +563,7 @@ impl Waiter { test_trace!("poll_wait: locking..."); let mut waiters = queue.queue.lock(); test_trace!("poll_wait: -> locked"); - queue_state = test_dbg!(queue.load()); + queue_state = queue.load(); // the whole queue was woken while we were trying to acquire // the lock! @@ -632,9 +654,32 @@ impl Waiter { let state = *(self.as_mut().project().state); let ptr = NonNull::from(unsafe { Pin::into_inner_unchecked(self) }); test_trace!(self = ?fmt::ptr(ptr), ?state, ?queue, "Waiter::release"); - if state == WaitState::Waiting { - unsafe { - queue.queue.lock().remove(ptr); + + // if we're not enqueued, we don't have to do anything else. + if state != WaitState::Waiting { + return; + } + + let mut waiters = queue.queue.lock(); + let state = queue.load(); + + // remove the node + unsafe { + // safety: we have the lock on the queue, so this is safe. + waiters.remove(ptr); + }; + + // if we removed the last waiter from the queue, transition the state to + // `Empty`. + if test_dbg!(waiters.is_empty()) && state.get(QueueState::STATE) == State::Waiting { + queue.store(state.with_state(State::Empty)); + } + + // if the node has an unconsumed wakeup, it must be assigned to the next + // node in the queue. + if Waiter::with_node(ptr, &mut waiters, |node| matches!(&node.waker, Wakeup::One)) { + if let Some(waker) = queue.wake_locked(&mut waiters, state) { + waker.wake() } } } diff --git a/maitake/src/wait/queue/tests.rs b/maitake/src/wait/queue/tests.rs index 544016d0..5db6ebf3 100644 --- a/maitake/src/wait/queue/tests.rs +++ b/maitake/src/wait/queue/tests.rs @@ -256,7 +256,7 @@ mod loom { #[test] fn drop_wait_future() { - use futures::future::poll_fn; + use futures_util::future::poll_fn; use std::future::Future; use std::task::Poll; @@ -280,7 +280,7 @@ mod loom { let thread2 = thread::spawn({ let q = q.clone(); move || { - block_on(async { + future::block_on(async { q.wait().await.unwrap(); // Trigger second notification q.wake(); From d41bd98ef83fbb244723ef24dc8dee69190d8857 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 4 Jun 2022 14:25:49 -0700 Subject: [PATCH 17/24] drop impl is unneeded because q can only be dropped when waiters exist Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 036ee447..22b71488 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -375,8 +375,13 @@ impl WaitQueue { /// synchronization primitives or resources: when an event makes a resource /// permanently unavailable, the queue can be closed. pub fn close(&self) { + let state = self.state.fetch_or(State::Closed as u8 as usize, SeqCst); + let state = test_dbg!(QueueState::from_bits(state)); + if state.get(QueueState::STATE) != State::Waiting { + return; + } + let mut queue = self.queue.lock(); - test_dbg!(self.state.fetch_or(State::Closed as u8 as usize, SeqCst)); // TODO(eliza): wake outside the lock using an array, a la // https://github.com/tokio-rs/tokio/blob/4941fbf7c43566a8f491c64af5a4cd627c99e5a6/tokio/src/sync/batch_semaphore.rs#L277-L303 @@ -482,12 +487,6 @@ impl WaitQueue { } } -impl Drop for WaitQueue { - fn drop(&mut self) { - self.close(); - } -} - // === impl Waiter === impl Waiter { @@ -679,6 +678,7 @@ impl Waiter { // node in the queue. if Waiter::with_node(ptr, &mut waiters, |node| matches!(&node.waker, Wakeup::One)) { if let Some(waker) = queue.wake_locked(&mut waiters, state) { + drop(waiters); waker.wake() } } From 4eda47f064a0d2246f6c123fb53fbfdb36ca70a6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 4 Jun 2022 14:47:26 -0700 Subject: [PATCH 18/24] fix transition to Waiting instead of Woken Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 22b71488..1c666ff4 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -326,6 +326,7 @@ impl WaitQueue { state = self.load(); if let Some(waker) = self.wake_locked(&mut *queue, state) { + drop(queue); waker.wake(); } } @@ -463,7 +464,7 @@ impl WaitQueue { if test_dbg!(state) != State::Waiting { // if there are no longer any queued tasks, try to store the // wakeup in the queue and bail. - if let Err(actual) = self.compare_exchange(curr, curr.with_state(State::Waiting)) { + if let Err(actual) = self.compare_exchange(curr, curr.with_state(State::Woken)) { debug_assert!(actual.get(QueueState::STATE) != State::Waiting); self.store(actual.with_state(State::Woken)); } From 332b746cd597d67f0bc4f6710b61aedaecd38bf5 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 5 Jun 2022 09:31:06 -0700 Subject: [PATCH 19/24] review feedback (thanks @jamesmunns) Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 1c666ff4..aea3b55e 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -175,9 +175,6 @@ struct Node { /// The node's waker waker: Wakeup, - // /// Optional user data. - // data: Option, - // This type is !Unpin due to the heuristic from: // _pin: PhantomPinned, @@ -495,7 +492,7 @@ impl Waiter { /// /// # Safety /// - /// This is only safe to call while the list is locked. The dummy `_list` + /// This is only safe to call while the list is locked. The `list` /// parameter ensures this method is only called while holding the lock, so /// this can be safe. /// From 916f7915375b36d435449fc3d026b007eae3ae81 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 5 Jun 2022 09:47:07 -0700 Subject: [PATCH 20/24] docs improvements --- maitake/src/wait/queue.rs | 97 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 4 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index aea3b55e..489a41f8 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -45,16 +45,105 @@ mod tests; /// primitive on its own: sometimes, you just need to have a bunch of tasks wait /// for something and then wake them all up. /// +/// # Examples +/// +/// Waking a single task at a time by calling [`wake`][wake]: +/// +/// ``` +/// use std::sync::Arc; +/// use maitake::{scheduler::Scheduler, wait::WaitQueue}; +/// +/// const TASKS: usize = 10; +/// +/// // In order to spawn tasks, we need a `Scheduler` instance. +/// let scheduler = Scheduler::new(); +/// +/// // Construct a new `WaitQueue`. +/// let q = Arc::new(WaitQueue::new()); +/// +/// // Spawn some tasks that will wait on the queue. +/// for _ in 0..TASKS { +/// let q = q.clone(); +/// scheduler.spawn(async move { +/// // Wait to be woken by the queue. +/// q.wait().await.expect("queue is not closed"); +/// }); +/// } +/// +/// // Tick the scheduler once. +/// let tick = scheduler.tick(); +/// +/// // No tasks should complete on this tick, as they are all waiting +/// // to be woken by the queue. +/// assert_eq!(tick.completed, 0, "no tasks have been woken"); +/// +/// let mut completed = 0; +/// for i in 1..=TASKS { +/// // Wake the next task from the queue. +/// q.wake(); +/// +/// // Tick the scheduler. +/// let tick = scheduler.tick(); +/// +/// // A single task should have completed on this tick. +/// completed += tick.completed; +/// assert_eq!(completed, i); +/// } +/// +/// assert_eq!(completed, TASKS, "all tasks should have completed"); +/// ``` +/// +/// Waking all tasks using [`wake_all`][wake_all]: +/// +/// ``` +/// use std::sync::Arc; +/// use maitake::{scheduler::Scheduler, wait::WaitQueue}; +/// +/// const TASKS: usize = 10; +/// +/// // In order to spawn tasks, we need a `Scheduler` instance. +/// let scheduler = Scheduler::new(); +/// +/// // Construct a new `WaitQueue`. +/// let q = Arc::new(WaitQueue::new()); +/// +/// // Spawn some tasks that will wait on the queue. +/// for _ in 0..TASKS { +/// let q = q.clone(); +/// scheduler.spawn(async move { +/// // Wait to be woken by the queue. +/// q.wait().await.expect("queue is not closed"); +/// }); +/// } +/// +/// // Tick the scheduler once. +/// let tick = scheduler.tick(); +/// +/// // No tasks should complete on this tick, as they are all waiting +/// // to be woken by the queue. +/// assert_eq!(tick.completed, 0, "no tasks have been woken"); +/// +/// // Wake all tasks waiting for the queue. +/// q.wake_all(); +/// +/// // Tick the scheduler again to run the woken tasks. +/// let tick = scheduler.tick(); +/// +/// // All tasks have now completed, since they were woken by the +/// // queue. +/// assert_eq!(tick.completed, TASKS, "all tasks should have completed"); +/// ``` +/// /// # Implementation Notes /// /// The *[intrusive]* aspect of this list is important, as it means that it does /// not allocate memory. Instead, nodes in the linked list are stored in the /// futures of tasks trying to wait for capacity. This means that it is not -/// necessary to allocate any heap memory for each task waiting to be notified. +/// necessary to allocate any heap memory for each task waiting to be woken. /// /// However, the intrusive linked list introduces one new danger: because /// futures can be *cancelled*, and the linked list nodes live within the -/// futures trying to wait for channel capacity, we *must* ensure that the node +/// futures trying to wait on the queue, we *must* ensure that the node /// is unlinked from the list before dropping a cancelled future. Failure to do /// so would result in the list containing dangling pointers. Therefore, we must /// use a *doubly-linked* list, so that nodes can edit both the previous and @@ -729,8 +818,8 @@ feature! { /// Future returned from [`WaitQueue::wait_owned()`]. /// - /// This is identical to the [`Wait`] future, except that it takes a - /// [`Weak`] reference to the [`WaitQueue`], allowing the returned future to + /// This is identical to the [`Wait`] future, except that it takes an + /// [`Arc`] reference to the [`WaitQueue`], allowing the returned future to /// live for the `'static` lifetime. /// /// This future is fused, so once it has completed, any future calls to poll From 5b5d716eceb6621c092a945ec044ce692ce1c312 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 5 Jun 2022 09:48:59 -0700 Subject: [PATCH 21/24] more review feedback Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 489a41f8..1f4a65b4 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -288,25 +288,29 @@ enum State { /// Waiting while the queue is in this state will enqueue the waiter; /// notifying while in this state will store a pending notification in the /// queue, transitioning to [`State::Woken`]. - Empty = 0, + Empty = 0b00, /// There are one or more waiters in the queue. Waiting while /// the queue is in this state will not transition the state. Waking while /// in this state will wake the first waiter in the queue; if this empties /// the queue, then the queue will transition to [`State::Empty`]. - Waiting = 1, + Waiting = 0b01, /// The queue has a stored notification. Waiting while the queue /// is in this state will consume the pending notification *without* /// enqueueing the waiter and transition the queue to [`State::Empty`]. /// Waking while in this state will leave the queue in this state. - Woken = 2, + Woken = 0b10, /// The queue is closed. Waiting while in this state will return /// [`Closed`] without transitioning the queue's state. /// + /// *Note*: This *must* correspond to all state bits being set, as it's set + /// via a [`fetch_or`]. + /// /// [`Closed`]: crate::wait::Closed - Closed = 3, + /// [`fetch_or`]: core::sync::atomic::AtomicUsize::fetch_or + Closed = 0b11, } impl QueueState { From 3fb5b736574cf545db324656ea7b4c9adc386b38 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 5 Jun 2022 09:54:37 -0700 Subject: [PATCH 22/24] review feedback Signed-off-by: Eliza Weisman --- maitake/src/wait/queue.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 1f4a65b4..897500ae 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -340,6 +340,12 @@ impl FromBits for State { } fn into_bits(self) -> usize { + self.into_usize() + } +} + +impl State { + const fn into_usize(self) -> usize { self as u8 as usize } } @@ -361,7 +367,7 @@ impl WaitQueue { #[cfg(not(loom))] pub const fn new() -> Self { Self { - state: CachePadded::new(AtomicUsize::new(0)), + state: CachePadded::new(AtomicUsize::new(State::Empty.into_usize())), queue: Mutex::new(List::new()), } } @@ -371,7 +377,7 @@ impl WaitQueue { #[cfg(loom)] pub fn new() -> Self { Self { - state: CachePadded::new(AtomicUsize::new(0)), + state: CachePadded::new(AtomicUsize::new(State::Empty.into_usize())), queue: Mutex::new(List::new()), } } @@ -379,7 +385,9 @@ impl WaitQueue { /// Wake the next task in the queue. /// /// If the queue is empty, a wakeup is stored in the `WaitQueue`, and the - /// next call to [`Wait`] will complete immediately. + /// next call to [`wait`] will complete immediately. + /// + /// [`wait`]: WaitQueue::wait #[inline] pub fn wake(&self) { // snapshot the queue's current state. @@ -466,7 +474,7 @@ impl WaitQueue { /// synchronization primitives or resources: when an event makes a resource /// permanently unavailable, the queue can be closed. pub fn close(&self) { - let state = self.state.fetch_or(State::Closed as u8 as usize, SeqCst); + let state = self.state.fetch_or(State::Closed.into_usize(), SeqCst); let state = test_dbg!(QueueState::from_bits(state)); if state.get(QueueState::STATE) != State::Waiting { return; From 8cc540eab56064e313b9c5b533869f436c764ef1 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 5 Jun 2022 10:08:02 -0700 Subject: [PATCH 23/24] use bitfield for wait future states --- maitake/src/wait/queue.rs | 193 +++++++++++++++++++++++--------------- 1 file changed, 116 insertions(+), 77 deletions(-) diff --git a/maitake/src/wait/queue.rs b/maitake/src/wait/queue.rs index 897500ae..9dc1e21d 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/wait/queue.rs @@ -206,34 +206,6 @@ pub struct Wait<'a> { waiter: Waiter, } -/// The state of a [`Waiter`] node in a [`WaitQueue`]. -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -enum WaitState { - /// The waiter has not yet been enqueued. - /// - /// The number of times [`WaitQueue::wake_all`] has been called is stored - /// when the node is created, in order to determine whether it was woken by - /// a stored wakeup when enqueueing. - /// - /// When in this state, the node is **not** part of the linked list, and - /// can be dropped without removing it from the list. - Start(usize), - - /// The waiter is waiting. - /// - /// When in this state, the node **is** part of the linked list. If the - /// node is dropped in this state, it **must** be removed from the list - /// before dropping it. Failure to ensure this will result in dangling - /// pointers in the linked list! - Waiting, - - /// The waiter has been woken. - /// - /// When in this state, the node is **not** part of the linked list, and - /// can be dropped without removing it from the list. - Woken, -} - /// A waiter node which may be linked into a wait queue. #[derive(Debug)] #[repr(C)] @@ -247,7 +219,7 @@ struct Waiter { node: UnsafeCell, /// The future's state. - state: WaitState, + state: WaitStateBits, } #[derive(Debug)] @@ -280,6 +252,46 @@ bitfield! { } } +bitfield! { + #[derive(Eq, PartialEq)] + struct WaitStateBits { + /// The waiter's state. + const STATE: WaitState; + + /// The number of times [`WaitQueue::wake_all`] has been called. + const WAKE_ALLS = ..; + } +} + +/// The state of a [`Waiter`] node in a [`WaitQueue`]. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[repr(u8)] +enum WaitState { + /// The waiter has not yet been enqueued. + /// + /// The number of times [`WaitQueue::wake_all`] has been called is stored + /// when the node is created, in order to determine whether it was woken by + /// a stored wakeup when enqueueing. + /// + /// When in this state, the node is **not** part of the linked list, and + /// can be dropped without removing it from the list. + Start, + + /// The waiter is waiting. + /// + /// When in this state, the node **is** part of the linked list. If the + /// node is dropped in this state, it **must** be removed from the list + /// before dropping it. Failure to ensure this will result in dangling + /// pointers in the linked list! + Waiting, + + /// The waiter has been woken. + /// + /// When in this state, the node is **not** part of the linked list, and + /// can be dropped without removing it from the list. + Woken, +} + /// The queue's current state. #[derive(Debug, Copy, Clone, Eq, PartialEq)] #[repr(u8)] @@ -313,43 +325,6 @@ enum State { Closed = 0b11, } -impl QueueState { - const ONE_WAKE_ALL: usize = Self::WAKE_ALLS.first_bit(); - - fn with_state(self, state: State) -> Self { - self.with(Self::STATE, state) - } -} - -impl FromBits for State { - const BITS: u32 = 2; - type Error = core::convert::Infallible; - - fn try_from_bits(bits: usize) -> Result { - Ok(match bits as u8 { - bits if bits == Self::Empty as u8 => Self::Empty, - bits if bits == Self::Waiting as u8 => Self::Waiting, - bits if bits == Self::Woken as u8 => Self::Woken, - bits if bits == Self::Closed as u8 => Self::Closed, - _ => unsafe { - mycelium_util::unreachable_unchecked!( - "all potential 2-bit patterns should be covered!" - ) - }, - }) - } - - fn into_bits(self) -> usize { - self.into_usize() - } -} - -impl State { - const fn into_usize(self) -> usize { - self as u8 as usize - } -} - #[derive(Clone, Debug)] enum Wakeup { Empty, @@ -512,8 +487,11 @@ impl WaitQueue { fn waiter(&self) -> Waiter { // how many times has `wake_all` been called when this waiter is created? let current_wake_alls = test_dbg!(self.load().get(QueueState::WAKE_ALLS)); + let state = WaitStateBits::new() + .with(WaitStateBits::WAKE_ALLS, current_wake_alls) + .with(WaitStateBits::STATE, WaitState::Start); Waiter { - state: WaitState::Start(current_wake_alls), + state, node: UnsafeCell::new(Node { links: list::Links::new(), waker: Wakeup::Empty, @@ -641,8 +619,8 @@ impl Waiter { test_trace!(ptr = ?fmt::ptr(self.as_mut()), "Waiter::poll_wait"); let mut this = self.as_mut().project(); - match test_dbg!(*this.state) { - WaitState::Start(wake_alls) => { + match test_dbg!(this.state.get(WaitStateBits::STATE)) { + WaitState::Start => { let mut queue_state = queue.load(); // can we consume a pending wakeup? @@ -653,7 +631,7 @@ impl Waiter { ) .is_ok() { - *this.state = WaitState::Woken; + this.state.set(WaitStateBits::STATE, WaitState::Woken); return Poll::Ready(Ok(())); } @@ -665,8 +643,10 @@ impl Waiter { // the whole queue was woken while we were trying to acquire // the lock! - if queue_state.get(QueueState::WAKE_ALLS) != wake_alls { - *this.state = WaitState::Woken; + if queue_state.get(QueueState::WAKE_ALLS) + != this.state.get(WaitStateBits::WAKE_ALLS) + { + this.state.set(WaitStateBits::STATE, WaitState::Woken); return Poll::Ready(Ok(())); } @@ -691,7 +671,7 @@ impl Waiter { .compare_exchange(queue_state, queue_state.with_state(State::Empty)) { Ok(_) => { - *this.state = WaitState::Woken; + this.state.set(WaitStateBits::STATE, WaitState::Woken); return Poll::Ready(Ok(())); } Err(actual) => queue_state = actual, @@ -702,7 +682,7 @@ impl Waiter { } // enqueue the node - *this.state = WaitState::Waiting; + this.state.set(WaitStateBits::STATE, WaitState::Waiting); this.node.as_mut().with_mut(|node| { unsafe { // safety: we may mutate the node because we are @@ -729,11 +709,11 @@ impl Waiter { Poll::Pending } Wakeup::All | Wakeup::One => { - *this.state = WaitState::Woken; + this.state.set(WaitStateBits::STATE, WaitState::Woken); Poll::Ready(Ok(())) } Wakeup::Closed => { - *this.state = WaitState::Woken; + this.state.set(WaitStateBits::STATE, WaitState::Woken); wait::closed() } Wakeup::Empty => unreachable!(), @@ -754,7 +734,7 @@ impl Waiter { test_trace!(self = ?fmt::ptr(ptr), ?state, ?queue, "Waiter::release"); // if we're not enqueued, we don't have to do anything else. - if state != WaitState::Waiting { + if state.get(WaitStateBits::STATE) != WaitState::Waiting { return; } @@ -821,6 +801,65 @@ impl PinnedDrop for Wait<'_> { } } +// === impl QueueState === + +impl QueueState { + const ONE_WAKE_ALL: usize = Self::WAKE_ALLS.first_bit(); + + fn with_state(self, state: State) -> Self { + self.with(Self::STATE, state) + } +} + +impl FromBits for State { + const BITS: u32 = 2; + type Error = core::convert::Infallible; + + fn try_from_bits(bits: usize) -> Result { + Ok(match bits as u8 { + bits if bits == Self::Empty as u8 => Self::Empty, + bits if bits == Self::Waiting as u8 => Self::Waiting, + bits if bits == Self::Woken as u8 => Self::Woken, + bits if bits == Self::Closed as u8 => Self::Closed, + _ => unsafe { + mycelium_util::unreachable_unchecked!( + "all potential 2-bit patterns should be covered!" + ) + }, + }) + } + + fn into_bits(self) -> usize { + self.into_usize() + } +} + +impl State { + const fn into_usize(self) -> usize { + self as u8 as usize + } +} + +// === impl WaitState === + +impl FromBits for WaitState { + const BITS: u32 = 2; + type Error = &'static str; + + fn try_from_bits(bits: usize) -> Result { + match bits as u8 { + bits if bits == Self::Start as u8 => Ok(Self::Start), + bits if bits == Self::Waiting as u8 => Ok(Self::Waiting), + bits if bits == Self::Woken as u8 => Ok(Self::Woken), + _ => Err("invalid `WaitState`; expected one of Start, Waiting, or Woken"), + } + } + + fn into_bits(self) -> usize { + self as u8 as usize + } +} + // === impl WaitOwned === feature! { From 0e9aa2d5e5e5453385e31850ca7f30e8c8eef9e7 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 5 Jun 2022 10:13:14 -0700 Subject: [PATCH 24/24] don't run doctests with alloc Signed-off-by: Eliza Weisman --- .github/workflows/ci.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 09112d01..8e5d4212 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -220,7 +220,9 @@ jobs: run: rustup show - uses: actions/checkout@v2 - name: run tests - run: cargo test -p maitake --no-default-features + # don't run doctests with `no-default-features`, as some of them + # require liballoc. + run: cargo test -p maitake --no-default-features --tests --lib # run loom tests maitake_loom: