From 7a5e062927474ca4b27e807bbb80cf4bcef04215 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 14:36:26 -0700 Subject: [PATCH] feat(cordyceps): add `Stack` and `TransferStack` (#434) This branch adds an implementation of a singly-linked mutable FIFO `Stack`and a singly-linked lock-free FIFO `TransferStack` in `cordyceps::stack`. A `Stack` is just your traditional FIFO stack structure. `TransferStack` is a bit more interesting --- it's a simplification of the traditional lock-free Treiber stack wher all elements are popped into a new mutable `Stack` in a single atomic operation. This avoids the ABA problem issues common with naive Treiber stack implementations. It's intended primarily for use in a `mimalloc`-style sharded per-core memory allocator. Closes #137 Signed-off-by: Eliza Weisman --- cordyceps/Cargo.toml | 2 +- cordyceps/src/lib.rs | 78 ++++- cordyceps/src/list.rs | 7 +- cordyceps/src/loom.rs | 222 +++++++++++++- cordyceps/src/stack.rs | 662 +++++++++++++++++++++++++++++++++++++++++ cordyceps/src/util.rs | 22 ++ 6 files changed, 968 insertions(+), 25 deletions(-) create mode 100644 cordyceps/src/stack.rs diff --git a/cordyceps/Cargo.toml b/cordyceps/Cargo.toml index 86921ded..a76a23e8 100644 --- a/cordyceps/Cargo.toml +++ b/cordyceps/Cargo.toml @@ -35,4 +35,4 @@ tracing = { version = "0.1" } [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = ["--cfg", "docsrs"] \ No newline at end of file diff --git a/cordyceps/src/lib.rs b/cordyceps/src/lib.rs index 40a2b33e..fdd8f3df 100644 --- a/cordyceps/src/lib.rs +++ b/cordyceps/src/lib.rs @@ -3,33 +3,83 @@ #![cfg_attr(docsrs, deny(missing_docs))] #![cfg_attr(not(any(feature = "std", test)), no_std)] #![allow(unused_unsafe)] - +//! +//! ## data structures +//! +//! `cordyceps` provides implementations of the following data structures: +//! +//! - **[`List`]: a mutable, doubly-linked list.** +//! +//! A [`List`] provides *O*(1) insertion and removal at both the head and +//! tail of the list. In addition, parts of a [`List`] may be split off to +//! form new [`List`]s, and two [`List`]s may be spliced together to form a +//! single [`List`], all in *O*(1) time. The [`list`] module also provides +//! [`list::Cursor`] and [`list::CursorMut`] types, which allow traversal and +//! modification of elements in a list. Finally, elements can remove themselves +//! from arbitrary positions in a [`List`], provided that they have mutable +//! access to the [`List`] itself. This makes the [`List`] type suitable for +//! use in cases where elements must be able to drop themselves while linked +//! into a list. +//! +//! The [`List`] type is **not** a lock-free data structure, and can only be +//! modified through `&mut` references. +//! +//! - **[`MpscQueue`]: a multi-producer, single-consumer (MPSC) lock-free +//! last-in, first-out (LIFO) queue.** +//! +//! A [`MpscQueue`] is a *lock-free* concurrent data structure that allows +//! multiple producers to concurrently push elements onto the queue, and a +//! single consumer to dequeue elements in the order that they were pushed. +//! +//! [`MpscQueue`]s can be used to efficiently share data from multiple +//! concurrent producers with a consumer. +//! +//! - **[`Stack`]: a mutable, singly-linked first-in, first-out (FIFO) +//! stack.** +//! +//! This is a simple, singly-linked stack with *O*(1) push and pop +//! operations. The pop operation returns the *last* element pushed to the +//! stack. A [`Stack`] also implements the [`Iterator`] trait; iterating over +//! a stack pops elements from the end of the list. +//! +//! The [`Stack`] type is **not** a lock-free data structure, and can only be +//! modified through `&mut` references. +//! +//! - **[`TransferStack`]: a lock-free, multi-producer FIFO stack, where +//! all elements currently in the stack are popped in a single atomic operation.** +//! +//! A [`TransferStack`] is a lock-free data structure where multiple producers +//! can [concurrently push elements](stack::TransferStack::push) to the end of +//! the stack through immutable `&` references. A consumer can [pop all +//! elements currently in the `TransferStack`](stack::TransferStack::take_all) +//! in a single atomic operation, returning a new [`Stack`]. Pushing an +//! element, and taking all elements in the [`TransferStack`] are both *O*(1) +//! operations. +//! +//! A [`TransferStack`] can be used to efficiently transfer ownership of +//! resources from multiple producers to a consumer, such as for reuse or +//! cleanup. #[cfg(feature = "alloc")] extern crate alloc; #[cfg(test)] extern crate std; -macro_rules! feature { - ( - #![$meta:meta] - $($item:item)* - ) => { - $( - #[cfg($meta)] - $item - )* - } -} +#[macro_use] +pub(crate) mod util; pub mod list; +pub mod mpsc_queue; +pub mod stack; + #[doc(inline)] pub use list::List; -pub mod mpsc_queue; #[doc(inline)] pub use mpsc_queue::MpscQueue; +#[doc(inline)] +pub use stack::{Stack, TransferStack}; + pub(crate) mod loom; -pub(crate) mod util; use core::ptr::NonNull; diff --git a/cordyceps/src/list.rs b/cordyceps/src/list.rs index c73a7033..100de0bf 100644 --- a/cordyceps/src/list.rs +++ b/cordyceps/src/list.rs @@ -24,11 +24,16 @@ pub use self::cursor::{Cursor, CursorMut}; /// /// This data structure may be used as a first-in, first-out queue by using the /// [`List::push_front`] and [`List::pop_back`] methods. It also supports -/// random-access removals using the [`List::remove`] method. +/// random-access removals using the [`List::remove`] method. This makes the +/// [`List`] type suitable for use in cases where elements must be able to drop +/// themselves while linked into a list. /// /// This data structure can also be used as a stack or doubly-linked list by using /// the [`List::pop_front`] and [`List::push_back`] methods. /// +/// The [`List`] type is **not** a lock-free data structure, and can only be +/// modified through `&mut` references. +/// /// In order to be part of a `List`, a type `T` must implement [`Linked`] for /// [`list::Links`]. /// diff --git a/cordyceps/src/loom.rs b/cordyceps/src/loom.rs index 1c46b791..09c18681 100644 --- a/cordyceps/src/loom.rs +++ b/cordyceps/src/loom.rs @@ -67,15 +67,30 @@ mod inner { pub(crate) mod sync { pub use core::sync::*; - #[cfg(feature = "alloc")] + #[cfg(all(feature = "alloc", not(test)))] pub use alloc::sync::*; + + #[cfg(test)] + pub use std::sync::*; } pub(crate) use core::sync::atomic; - #[cfg(feature = "std")] - pub use std::thread; - + #[cfg(test)] + pub(crate) mod thread { + pub(crate) use std::thread::{yield_now, JoinHandle}; + pub(crate) fn spawn(f: F) -> JoinHandle + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let track = super::alloc::track::Registry::current(); + std::thread::spawn(move || { + let _tracking = track.map(|track| track.set_default()); + f() + }) + } + } pub(crate) mod hint { #[inline(always)] pub(crate) fn spin_loop() { @@ -138,23 +153,212 @@ mod inner { } } } + + #[cfg(test)] + pub(crate) mod model { + #[non_exhaustive] + #[derive(Default)] + pub(crate) struct Builder { + pub(crate) max_threads: usize, + pub(crate) max_branches: usize, + pub(crate) max_permutations: Option, + // pub(crate) max_duration: Option, + pub(crate) preemption_bound: Option, + // pub(crate) checkpoint_file: Option, + pub(crate) checkpoint_interval: usize, + pub(crate) location: bool, + pub(crate) log: bool, + } + + impl Builder { + pub(crate) fn new() -> Self { + Self::default() + } + + pub(crate) fn check(&self, f: impl FnOnce()) { + let registry = super::alloc::track::Registry::default(); + let _tracking = registry.set_default(); + f(); + registry.check(); + } + } + } + + #[cfg(test)] + pub(crate) fn model(f: impl FnOnce()) { + let collector = tracing_subscriber::fmt() + .with_max_level(tracing::Level::TRACE) + .with_test_writer() + .without_time() + .with_thread_ids(true) + .with_thread_names(false) + .finish(); + let _ = tracing::subscriber::set_global_default(collector); + model::Builder::new().check(f) + } + pub(crate) mod alloc { + #[cfg(test)] + use std::sync::Arc; + + #[cfg(test)] + pub(in crate::loom) mod track { + use std::{ + cell::RefCell, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, Weak, + }, + }; + + #[derive(Clone, Debug, Default)] + pub(crate) struct Registry(Arc>); + + #[derive(Debug, Default)] + struct RegistryInner { + tracks: Vec>, + next_id: usize, + } + + #[derive(Debug)] + pub(super) struct TrackData { + was_leaked: AtomicBool, + type_name: &'static str, + location: &'static core::panic::Location<'static>, + id: usize, + } + + thread_local! { + static REGISTRY: RefCell> = RefCell::new(None); + } + + impl Registry { + pub(in crate::loom) fn current() -> Option { + REGISTRY.with(|current| current.borrow().clone()) + } + + pub(in crate::loom) fn set_default(&self) -> impl Drop { + struct Unset(Option); + impl Drop for Unset { + fn drop(&mut self) { + let _ = + REGISTRY.try_with(|current| *current.borrow_mut() = self.0.take()); + } + } + + REGISTRY.with(|current| { + let mut current = current.borrow_mut(); + let unset = Unset(current.clone()); + *current = Some(self.clone()); + unset + }) + } + + #[track_caller] + pub(super) fn start_tracking() -> Option> { + // we don't use `Option::map` here because it creates a + // closure, which breaks `#[track_caller]`, since the caller + // of `insert` becomes the closure, which cannot have a + // `#[track_caller]` attribute on it. + #[allow(clippy::manual_map)] + match Self::current() { + Some(registry) => Some(registry.insert::()), + _ => None, + } + } + + #[track_caller] + pub(super) fn insert(&self) -> Arc { + let mut inner = self.0.lock().unwrap(); + let id = inner.next_id; + inner.next_id += 1; + let location = core::panic::Location::caller(); + let type_name = std::any::type_name::(); + let data = Arc::new(TrackData { + type_name, + location, + id, + was_leaked: AtomicBool::new(false), + }); + let weak = Arc::downgrade(&data); + test_trace!( + target: "maitake::alloc", + id, + "type" = %type_name, + %location, + "started tracking allocation", + ); + inner.tracks.push(weak); + data + } + + pub(in crate::loom) fn check(&self) { + let leaked = self + .0 + .lock() + .unwrap() + .tracks + .iter() + .filter_map(|weak| { + let data = weak.upgrade()?; + data.was_leaked.store(true, Ordering::SeqCst); + Some(format!( + " - id {}, {} allocated at {}", + data.id, data.type_name, data.location + )) + }) + .collect::>(); + if !leaked.is_empty() { + let leaked = leaked.join("\n "); + panic!("the following allocations were leaked:\n {leaked}"); + } + } + } + + impl Drop for TrackData { + fn drop(&mut self) { + if !self.was_leaked.load(Ordering::SeqCst) { + test_trace!( + target: "maitake::alloc", + id = self.id, + "type" = %self.type_name, + location = %self.location, + "dropped all references to a tracked allocation", + ); + } + } + } + } + /// Track allocations, detecting leaks #[derive(Debug, Default)] pub struct Track { value: T, + + #[cfg(test)] + track: Option>, } impl Track { + pub const fn new_const(value: T) -> Track { + Track { + value, + + #[cfg(test)] + track: None, + } + } + /// Track a value for leaks #[inline(always)] + #[track_caller] pub fn new(value: T) -> Track { - Track { value } - } + Track { + value, - #[inline(always)] - pub const fn new_const(value: T) -> Track { - Track { value } + #[cfg(test)] + track: track::Registry::start_tracking::(), + } } /// Get a reference to the value diff --git a/cordyceps/src/stack.rs b/cordyceps/src/stack.rs new file mode 100644 index 00000000..14bf6465 --- /dev/null +++ b/cordyceps/src/stack.rs @@ -0,0 +1,662 @@ +//! [Intrusive], singly-linked first-in, first-out (FIFO) stacks. +//! +//! See the documentation for the [`Stack`] and [`TransferStack`] types for +//! details. +//! +//! [intrusive]: crate#intrusive-data-structures +#![warn(missing_debug_implementations)] + +use crate::{ + loom::{ + cell::UnsafeCell, + sync::atomic::{AtomicPtr, Ordering::*}, + }, + Linked, +}; +use core::{ + fmt, + marker::PhantomPinned, + ptr::{self, NonNull}, +}; + +/// An [intrusive] lock-free singly-linked FIFO stack, where all entries +/// currently in the stack are consumed in a single atomic operation. +/// +/// A transfer stack is perhaps the world's simplest lock-free concurrent data +/// structure. It provides two primary operations: +/// +/// - [`TransferStack::push`], which appends an element to the end of the +/// transfer stack, +/// +/// - [`TransferStack::take_all`], which atomically takes all elements currently +/// on the transfer stack and returns them as a new mutable [`Stack`]. +/// +/// These are both *O*(1) operations, although `push` performs a +/// compare-and-swap loop that may be retried if another producer concurrently +/// pushed an element. +/// +/// In order to be part of a `TransferStack`, a type `T` must implement +/// the [`Linked`] trait for [`stack::Links`](Links). +/// +/// Pushing elements into a `TransferStack` takes ownership of those elements +/// through an owning [`Handle` type](Linked::Handle). Dropping a +/// [`TransferStack`] drops all elements currently linked into the stack. +/// +/// A transfer stack is often useful in cases where a large number of resources +/// must be efficiently transferred from several producers to a consumer, such +/// as for reuse or cleanup. For example, a [`TransferStack`] can be used as the +/// "thread" (shared) free list in a [`mimalloc`-style sharded +/// allocator][mimalloc], with a mutable [`Stack`] used as the local +/// (unsynchronized) free list. When an allocation is freed from the same CPU +/// core that it was allocated on, it is pushed to the local free list, using an +/// unsynchronized mutable [`Stack::push`] operation. If an allocation is freed +/// from a different thread, it is instead pushed to that thread's shared free +/// list, a [`TransferStack`], using an atomic [`TransferStack::push`] +/// operation. New allocations are popped from the local unsynchronized free +/// list, and if the local free list is empty, the entire shared free list is +/// moved onto the local free list. This allows objects which do not leave the +/// CPU core they were allocated on to be both allocated and deallocated using +/// unsynchronized operations, and new allocations only perform an atomic +/// operation when the local free list is empty. +/// +/// [intrusive]: crate#intrusive-data-structures +/// [mimalloc]: https://www.microsoft.com/en-us/research/uploads/prod/2019/06/mimalloc-tr-v1.pdf +pub struct TransferStack>> { + head: AtomicPtr, +} + +/// An [intrusive] singly-linked mutable FIFO stack. +/// +/// This is a very simple implementation of a linked `Stack`, which provides +/// *O*(1) [`push`](Self::push) and [`pop`](Self::pop) operations. Items are +/// popped from the stack in the opposite order that they were pushed in. +/// +/// A [`Stack`] also implements the [`Iterator`] trait, with the +/// [`Iterator::next`] method popping elements from the end of the stack. +/// +/// In order to be part of a `Stack`, a type `T` must implement +/// the [`Linked`] trait for [`stack::Links`](Links). +/// +/// Pushing elements into a `Stack` takes ownership of those elements +/// through an owning [`Handle` type](Linked::Handle). Dropping a +/// `Stack` drops all elements currently linked into the stack. +/// +/// [intrusive]: crate#intrusive-data-structures +pub struct Stack>> { + head: Option>, +} + +/// Links to other nodes in a [`TransferStack`] or [`Stack`]. +/// +/// In order to be part of a [`Stack`] or [`TransferStack`], a type must contain +/// an instance of this type, and must implement the [`Linked`] trait for +/// `Links`. +pub struct Links { + /// The next node in the queue. + next: UnsafeCell>>, + + /// Linked list links must always be `!Unpin`, in order to ensure that they + /// never recieve LLVM `noalias` annotations; see also + /// . + _unpin: PhantomPinned, +} + +// === impl AtomicStack === + +impl TransferStack +where + T: Linked>, +{ + /// Returns a new `TransferStack` with no elements. + #[cfg(not(loom))] + #[must_use] + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Returns a new `TransferStack` with no elements. + #[cfg(loom)] + #[must_use] + pub fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Pushes `element` onto the end of this `TransferStack`, taking ownership + /// of it. + /// + /// This is an *O*(1) operation, although it performs a compare-and-swap + /// loop that may repeat if another producer is concurrently calling `push` + /// on the same `TransferStack`. + /// + /// This takes ownership over `element` through its [owning `Handle` + /// type](Linked::Handle). If the `TransferStack` is dropped before the + /// pushed `element` is removed from the stack, the `element` will be dropped. + pub fn push(&self, element: T::Handle) { + let ptr = T::into_ptr(element); + test_trace!(?ptr, "TransferStack::push"); + let links = unsafe { T::links(ptr).as_mut() }; + debug_assert!(links.next.with(|next| unsafe { (*next).is_none() })); + + let mut head = self.head.load(Relaxed); + loop { + test_trace!(?ptr, ?head, "TransferStack::push"); + links.next.with_mut(|next| unsafe { + *next = NonNull::new(head); + }); + + match self + .head + .compare_exchange_weak(head, ptr.as_ptr(), AcqRel, Acquire) + { + Ok(_) => { + test_trace!(?ptr, ?head, "TransferStack::push -> pushed"); + return; + } + Err(actual) => head = actual, + } + } + } + + /// Takes all elements *currently* in this `TransferStack`, returning a new + /// mutable [`Stack`] containing those elements. + /// + /// This is an *O*(1) operation which does not allocate memory. It will + /// never loop and does not spin. + #[must_use] + pub fn take_all(&self) -> Stack { + let head = self.head.swap(ptr::null_mut(), AcqRel); + let head = NonNull::new(head); + Stack { head } + } +} + +impl Drop for TransferStack +where + T: Linked>, +{ + fn drop(&mut self) { + // The stack owns any entries that are still in the stack; ensure they + // are dropped before dropping the stack. + for entry in self.take_all() { + drop(entry); + } + } +} + +impl fmt::Debug for TransferStack +where + T: Linked>, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Self { head } = self; + f.debug_struct("TransferStack").field("head", head).finish() + } +} + + +// === impl Stack === + +impl Stack +where + T: Linked>, +{ + /// Returns a new `Stack` with no elements in it. + #[must_use] + pub const fn new() -> Self { + Self { + head: None, + } + } + + /// Pushes `element` onto the end of this `Stack`, taking ownership + /// of it. + /// + /// This is an *O*(1) operation that does not allocate memory. It will never + /// loop. + /// + /// This takes ownership over `element` through its [owning `Handle` + /// type](Linked::Handle). If the `Stack` is dropped before the + /// pushed `element` is [`pop`](Self::pop)pped from the stack, the `element` + /// will be dropped. + pub fn push(&mut self, element: T::Handle) { + let ptr = T::into_ptr(element); + test_trace!(?ptr, ?self.head, "Stack::push"); + unsafe { + // Safety: we have exclusive mutable access to the stack, and + // therefore can also mutate the stack's entries. + let links = T::links(ptr).as_mut(); + links.next.with_mut(|next| { + debug_assert!((*next).is_none()); + *next = self.head.replace(ptr); + }) + } + } + + + /// Returns the element most recently [push](Self::push)ed to this `Stack`, + /// or `None` if the stack is empty. + /// + /// This is an *O*(1) operation which does not allocate memory. It will + /// never loop and does not spin. + #[must_use] + pub fn pop(&mut self) -> Option { + test_trace!(?self.head, "Stack::pop"); + let head = self.head.take()?; + unsafe { + // Safety: we have exclusive ownership over this chunk of stack. + + // advance the head link to the next node after the current one (if + // there is one). + self.head = T::links(head).as_mut().next.with_mut(|next| (*next).take()); + + test_trace!(?self.head, "Stack::pop -> popped"); + + // return the current node + Some(T::from_ptr(head)) + } + } + + /// Takes all elements *currently* in this `Stack`, returning a new + /// mutable `Stack` containing those elements. + /// + /// This is an *O*(1) operation which does not allocate memory. It will + /// never loop and does not spin. + #[must_use] + pub fn take_all(&mut self) -> Self { + Self { + head: self.head.take(), + } + } + + /// Returns `true` if this `Stack` is empty. + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.head.is_none() + } +} + +impl Drop for Stack +where + T: Linked>, +{ + fn drop(&mut self) { + // The stack owns any entries that are still in the stack; ensure they + // are dropped before dropping the stack. + for entry in self { + drop(entry); + } + } +} + +impl fmt::Debug for Stack +where + T: Linked>, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Self { head } = self; + f.debug_struct("Stack").field("head", head).finish() + } +} + +impl Iterator for Stack +where + T: Linked>, +{ + type Item = T::Handle; + + fn next(&mut self) -> Option { + self.pop() + } +} + +/// # Safety +/// +/// A `Stack` is `Send` if `T` is send, because moving it across threads +/// also implicitly moves any `T`s in the stack. +unsafe impl Send for Stack +where T: Send, T: Linked> {} + +unsafe impl Sync for Stack +where T: Sync, T: Linked> {} + +// === impl Links === + +impl Links { + /// Returns new [`TransferStack`] links. + #[cfg(not(loom))] + #[must_use] + pub const fn new() -> Self { + Self { + next: UnsafeCell::new(None), + _unpin: PhantomPinned, + } + } + + /// Returns new [`TransferStack`] links. + #[cfg(loom)] + #[must_use] + pub fn new() -> Self { + Self { + next: UnsafeCell::new(None), + _unpin: PhantomPinned, + } + } +} + +/// # Safety +/// +/// Types containing [`Links`] may be `Send`: the pointers within the `Links` may +/// mutably alias another value, but the links can only be _accessed_ by the +/// owner of the [`TransferStack`] itself, because the pointers are private. As +/// long as [`TransferStack`] upholds its own invariants, `Links` should not +/// make a type `!Send`. +unsafe impl Send for Links {} + +/// # Safety +/// +/// Types containing [`Links`] may be `Send`: the pointers within the `Links` may +/// mutably alias another value, but the links can only be _accessed_ by the +/// owner of the [`TransferStack`] itself, because the pointers are private. As +/// long as [`TransferStack`] upholds its own invariants, `Links` should not +/// make a type `!Send`. +unsafe impl Sync for Links {} + +impl fmt::Debug for Links { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("transfer_stack::Links { ... }") + } +} + + +#[cfg(test)] +mod loom { + use super::*; + use crate::loom::{ + self, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + thread, + }; + use test_util::Entry; + + #[test] + fn multithreaded_push() { + const PUSHES: i32 = 2; + loom::model(|| { + let stack = Arc::new(TransferStack::new()); + let threads = Arc::new(AtomicUsize::new(2)); + let thread1 = thread::spawn({ + let stack = stack.clone(); + let threads = threads.clone(); + move || { + Entry::push_all(&stack, 1, PUSHES); + threads.fetch_sub(1, Ordering::Relaxed); + } + }); + + let thread2 = thread::spawn({ + let stack = stack.clone(); + let threads = threads.clone(); + move || { + Entry::push_all(&stack, 2, PUSHES); + threads.fetch_sub(1, Ordering::Relaxed); + } + }); + + let mut seen = Vec::new(); + + loop { + seen.extend(stack.take_all().map(|entry| entry.val)); + + if threads.load(Ordering::Relaxed) == 0 { + break; + } + + thread::yield_now(); + } + + seen.extend(stack.take_all().map(|entry| entry.val)); + + seen.sort(); + assert_eq!(seen, vec![10, 11, 20, 21]); + + thread1.join().unwrap(); + thread2.join().unwrap(); + }) + } + + #[test] + fn multithreaded_pop() { + const PUSHES: i32 = 2; + loom::model(|| { + let stack = Arc::new(TransferStack::new()); + let thread1 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 1, PUSHES) + }); + + let thread2 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 2, PUSHES) + }); + + let thread3 = thread::spawn({ + let stack = stack.clone(); + move || stack.take_all().map(|entry| entry.val).collect::>() + }); + + let seen_thread0 = stack.take_all().map(|entry| entry.val).collect::>(); + let seen_thread3 = thread3.join().unwrap(); + + thread1.join().unwrap(); + thread2.join().unwrap(); + + let seen_thread0_final = stack.take_all().map(|entry| entry.val).collect::>(); + + let mut all = dbg!(seen_thread0); + all.extend(dbg!(seen_thread3)); + all.extend(dbg!(seen_thread0_final)); + + all.sort(); + assert_eq!(all, vec![10, 11, 20, 21]); + }) + } + + #[test] + fn doesnt_leak() { + const PUSHES: i32 = 2; + loom::model(|| { + let stack = Arc::new(TransferStack::new()); + let thread1 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 1, PUSHES) + }); + + let thread2 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 2, PUSHES) + }); + + tracing::info!("dropping stack"); + drop(stack); + + thread1.join().unwrap(); + thread2.join().unwrap(); + }) + } + + #[test] + fn take_all_doesnt_leak() { + const PUSHES: i32 = 2; + loom::model(|| { + let stack = Arc::new(TransferStack::new()); + let thread1 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 1, PUSHES) + }); + + let thread2 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 2, PUSHES) + }); + + thread1.join().unwrap(); + thread2.join().unwrap(); + + let take_all = stack.take_all(); + + tracing::info!("dropping stack"); + drop(stack); + + tracing::info!("dropping take_all"); + drop(take_all); + }) + } + + #[test] + fn take_all_doesnt_leak_racy() { + const PUSHES: i32 = 2; + loom::model(|| { + let stack = Arc::new(TransferStack::new()); + let thread1 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 1, PUSHES) + }); + + let thread2 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 2, PUSHES) + }); + + let take_all = stack.take_all(); + + thread1.join().unwrap(); + thread2.join().unwrap(); + + tracing::info!("dropping stack"); + drop(stack); + + tracing::info!("dropping take_all"); + drop(take_all); + }) + } + + + #[test] + fn unsync() { + loom::model(|| { + let mut stack = Stack::::new(); + stack.push(Entry::new(1)); + stack.push(Entry::new(2)); + stack.push(Entry::new(3)); + let mut take_all = stack.take_all(); + + for i in (1..=3).rev() { + assert_eq!(take_all.next().unwrap().val, i); + stack.push(Entry::new(10 + i)); + } + + let mut i = 11; + for entry in stack.take_all() { + assert_eq!(entry.val, i); + i += 1; + } + + }) + } + + #[test] + fn unsync_doesnt_leak() { + loom::model(|| { + let mut stack = Stack::::new(); + stack.push(Entry::new(1)); + stack.push(Entry::new(2)); + stack.push(Entry::new(3)); + }) + } + +} + +#[cfg(test)] +mod test { + use super::{*, test_util::Entry}; + + #[test] + fn stack_is_send_sync() { + crate::util::assert_send_sync::>() + } + + #[test] + fn links_are_send_sync() { + crate::util::assert_send_sync::>() + } +} + +#[cfg(test)] +mod test_util { + use super::*; + use core::pin::Pin; + use crate::loom::alloc; + + #[pin_project::pin_project] + pub(super) struct Entry { + #[pin] + links: Links, + pub(super) val: i32, + track: alloc::Track<()>, + } + + unsafe impl Linked> for Entry { + type Handle = Pin>; + + fn into_ptr(handle: Pin>) -> NonNull { + unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) } + } + + unsafe fn from_ptr(ptr: NonNull) -> Self::Handle { + // Safety: if this function is only called by the linked list + // implementation (and it is not intended for external use), we can + // expect that the `NonNull` was constructed from a reference which + // was pinned. + // + // If other callers besides `List`'s internals were to call this on + // some random `NonNull`, this would not be the case, and + // this could be constructing an erroneous `Pin` from a referent + // that may not be pinned! + Pin::new_unchecked(Box::from_raw(ptr.as_ptr())) + } + + unsafe fn links(target: NonNull) -> NonNull> { + let links = ptr::addr_of_mut!((*target.as_ptr()).links); + // Safety: it's fine to use `new_unchecked` here; if the pointer that we + // offset to the `links` field is not null (which it shouldn't be, as we + // received it as a `NonNull`), the offset pointer should therefore also + // not be null. + NonNull::new_unchecked(links) + } + } + + impl Entry { + pub(super) fn new(val: i32) -> Pin> { + Box::pin(Entry { + links: Links::new(), + val, + track: alloc::Track::new(()), + }) + } + + pub(super) fn push_all(stack: &TransferStack, thread: i32, n: i32) { + for i in 0..n { + let entry = Self::new((thread * 10) + i); + stack.push(entry); + } + } + } +} diff --git a/cordyceps/src/util.rs b/cordyceps/src/util.rs index 9e3ce2ac..e28f4f0b 100644 --- a/cordyceps/src/util.rs +++ b/cordyceps/src/util.rs @@ -4,6 +4,25 @@ use core::{ ops::{Deref, DerefMut}, }; +macro_rules! feature { + ( + #![$meta:meta] + $($item:item)* + ) => { + $( + #[cfg($meta)] + $item + )* + } +} + +macro_rules! test_trace { + ($($tt:tt)*) => { + #[cfg(test)] + tracing::trace!($($tt)*) + } +} + /// An exponential backoff for spin loops #[derive(Debug, Clone)] pub(crate) struct Backoff { @@ -131,3 +150,6 @@ impl fmt::Display for FmtOption<'_, T> { } } } + +#[cfg(test)] +pub(crate) fn assert_send_sync() {} \ No newline at end of file