From 09101629930bfd23a3c78b1fa7fa9f02d4c0d727 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 16 Jun 2022 10:27:48 -0700 Subject: [PATCH 01/12] wip transfer stack thing Signed-off-by: Eliza Weisman --- cordyceps/src/lib.rs | 4 +- cordyceps/src/transfer_stack.rs | 87 +++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 cordyceps/src/transfer_stack.rs diff --git a/cordyceps/src/lib.rs b/cordyceps/src/lib.rs index 2c591077..135352f5 100644 --- a/cordyceps/src/lib.rs +++ b/cordyceps/src/lib.rs @@ -21,9 +21,11 @@ macro_rules! feature { } pub mod list; +pub mod mpsc_queue; +pub mod transfer_stack; + #[doc(inline)] pub use list::List; -pub mod mpsc_queue; #[doc(inline)] pub use mpsc_queue::MpscQueue; diff --git a/cordyceps/src/transfer_stack.rs b/cordyceps/src/transfer_stack.rs new file mode 100644 index 00000000..0f6d684f --- /dev/null +++ b/cordyceps/src/transfer_stack.rs @@ -0,0 +1,87 @@ +use crate::{ + loom::{ + cell::UnsafeCell, + sync::atomic::{AtomicPtr, Ordering::*}, + }, + Linked, +}; +use core::{ + fmt, + marker::PhantomPinned, + ptr::{self, NonNull}, +}; + +pub struct TransferStack { + head: AtomicPtr, +} + +pub struct Drain { + next: Option>, +} + +/// Links to other nodes in a [`TransferStack`]. +/// +/// In order to be part of a [`TransferStack`], a type must contain an instance of this +/// type, and must implement the [`Linked`] trait for `Links`. +pub struct Links { + /// The next node in the queue. + next: UnsafeCell>>, + + /// Linked list links must always be `!Unpin`, in order to ensure that they + /// never recieve LLVM `noalias` annotations; see also + /// . + _unpin: PhantomPinned, +} + +impl TransferStack +where + T: Linked>, +{ + pub fn push(&self, element: T::Handle) { + let ptr = T::into_ptr(element); + let links = unsafe { T::links(ptr).as_mut() }; + debug_assert!(links.next.with(|next| unsafe { (*next).is_none() })); + + let mut head = self.head.load(Relaxed); + loop { + links.next.with_mut(|next| unsafe { + *next = NonNull::new(head); + }); + + match self + .head + .compare_exchange_weak(head, ptr.as_ptr(), AcqRel, Acquire) + { + Ok(_) => return, + Err(actual) => head = actual, + } + } + } + + pub fn pop_all(&self) -> Drain { + let head = self.head.swap(ptr::null_mut(), AcqRel); + let next = NonNull::new(head); + Drain { next } + } +} + +// === impl Drain === + +impl Iterator for Drain +where + T: Linked>, +{ + type Item = T::Handle; + + fn next(&mut self) -> Option { + let curr = self.next.take()?; + unsafe { + // advance the iterator to the next node after the current one (if + // there is one). + self.next = T::links(curr).as_mut().next.with_mut(|next| (*next).take()); + + // return the current node + Some(T::from_ptr(curr)) + } + } +} From 59ccfc393b80df43f07cffb5b9b9031a3694461d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 18 Jun 2022 10:18:39 -0700 Subject: [PATCH 02/12] wip --- Cargo.lock | 6 +- cordyceps/Cargo.toml | 15 ++- cordyceps/src/lib.rs | 14 +-- cordyceps/src/mpsc_queue.rs | 1 + cordyceps/src/transfer_stack.rs | 211 +++++++++++++++++++++++++++++++- cordyceps/src/util.rs | 73 +++++++++++ 6 files changed, 300 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c68d76b..fe41bc8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,6 +192,8 @@ dependencies = [ "pin-project", "proptest", "tracing 0.1.34", + "tracing 0.2.0", + "tracing-subscriber 0.3.0", "tracing-subscriber 0.3.11", ] @@ -527,9 +529,9 @@ dependencies = [ [[package]] name = "loom" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85eb735cf3c8ebac6cc3655c5da2f4a088b6a19133aa482471a21ba0eb5d83ab" +checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" dependencies = [ "cfg-if", "generator", diff --git a/cordyceps/Cargo.toml b/cordyceps/Cargo.toml index 5d840ddb..a68923d9 100644 --- a/cordyceps/Cargo.toml +++ b/cordyceps/Cargo.toml @@ -25,13 +25,24 @@ no-cache-pad = [] [dev-dependencies] proptest = "1" -tracing = { version = "0.1" } -tracing-subscriber = { version = "0.3", features = ["fmt"] } pin-project = "1" +[dev-dependencies.tracing_02] +package = "tracing" +default_features = false +git = "https://github.com/tokio-rs/tracing" + +[target.'cfg(not(loom))'.dev-dependencies] +tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", features = ["ansi", "fmt"] } + [target.'cfg(loom)'.dependencies] loom = "0.5.5" +[target.'cfg(loom)'.dev-dependencies] +loom = { version = "0.5.5", features = ["futures", "checkpoint"] } +tracing_01 = { package = "tracing", version = "0.1", default_features = false } +tracing_subscriber_03 = { package = "tracing-subscriber", version = "0.3.11", features = ["fmt"] } + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/cordyceps/src/lib.rs b/cordyceps/src/lib.rs index 135352f5..65ad7923 100644 --- a/cordyceps/src/lib.rs +++ b/cordyceps/src/lib.rs @@ -8,17 +8,8 @@ extern crate alloc; #[cfg(test)] extern crate std; -macro_rules! feature { - ( - #![$meta:meta] - $($item:item)* - ) => { - $( - #[cfg($meta)] - $item - )* - } -} +#[macro_use] +pub(crate) mod util; pub mod list; pub mod mpsc_queue; @@ -30,7 +21,6 @@ pub use list::List; pub use mpsc_queue::MpscQueue; pub(crate) mod loom; -pub(crate) mod util; use core::ptr::NonNull; diff --git a/cordyceps/src/mpsc_queue.rs b/cordyceps/src/mpsc_queue.rs index deb064e6..eb31e663 100644 --- a/cordyceps/src/mpsc_queue.rs +++ b/cordyceps/src/mpsc_queue.rs @@ -1243,6 +1243,7 @@ unsafe fn non_null(ptr: *mut T) -> NonNull { mod loom { use super::*; use crate::loom::{self, sync::Arc, thread}; + use crate::util::tracing; use test_util::*; #[test] diff --git a/cordyceps/src/transfer_stack.rs b/cordyceps/src/transfer_stack.rs index 0f6d684f..e187f403 100644 --- a/cordyceps/src/transfer_stack.rs +++ b/cordyceps/src/transfer_stack.rs @@ -14,7 +14,6 @@ use core::{ pub struct TransferStack { head: AtomicPtr, } - pub struct Drain { next: Option>, } @@ -37,13 +36,34 @@ impl TransferStack where T: Linked>, { + /// Returns a new `TransferStack`. + #[cfg(not(loom))] + #[must_use] + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Returns a new `TransferStack`. + #[cfg(loom)] + #[must_use] + pub fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + } + } + pub fn push(&self, element: T::Handle) { let ptr = T::into_ptr(element); + test_trace!(?ptr, "TransferStack::push"); + let links = unsafe { T::links(ptr).as_mut() }; debug_assert!(links.next.with(|next| unsafe { (*next).is_none() })); let mut head = self.head.load(Relaxed); loop { + test_trace!(?ptr, ?head, "TransferStack::push"); links.next.with_mut(|next| unsafe { *next = NonNull::new(head); }); @@ -52,19 +72,44 @@ where .head .compare_exchange_weak(head, ptr.as_ptr(), AcqRel, Acquire) { - Ok(_) => return, + Ok(_) => { + test_trace!(?ptr, ?head, "TransferStack::push -> pushed"); + return; + } Err(actual) => head = actual, } } } - pub fn pop_all(&self) -> Drain { + pub fn drain(&self) -> Drain { let head = self.head.swap(ptr::null_mut(), AcqRel); let next = NonNull::new(head); Drain { next } } } +impl Links { + /// Returns new [`TransferStack`] links. + #[cfg(not(loom))] + #[must_use] + pub const fn new() -> Self { + Self { + next: UnsafeCell::new(None), + _unpin: PhantomPinned, + } + } + + /// Returns new [`TransferStack`] links. + #[cfg(loom)] + #[must_use] + pub fn new() -> Self { + Self { + next: UnsafeCell::new(None), + _unpin: PhantomPinned, + } + } +} + // === impl Drain === impl Iterator for Drain @@ -74,14 +119,172 @@ where type Item = T::Handle; fn next(&mut self) -> Option { - let curr = self.next.take()?; + let curr = self.next.take(); + test_trace!(?curr, "Drain::next"); + let curr = curr?; unsafe { // advance the iterator to the next node after the current one (if // there is one). self.next = T::links(curr).as_mut().next.with_mut(|next| (*next).take()); + test_trace!(?self.next, "Drain::next"); + // return the current node Some(T::from_ptr(curr)) } } } + +#[cfg(all(loom, test))] +mod loom { + use super::*; + use crate::loom::{ + self, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + thread, + }; + use test_util::Entry; + + #[test] + fn multithreaded_push() { + const PUSHES: i32 = 2; + loom::model(|| { + let stack = Arc::new(TransferStack::new()); + let threads = Arc::new(AtomicUsize::new(2)); + let thread1 = thread::spawn({ + let stack = stack.clone(); + let threads = threads.clone(); + move || { + Entry::push_all(&stack, 1, PUSHES); + threads.fetch_sub(1, Ordering::Relaxed); + } + }); + + let thread2 = thread::spawn({ + let stack = stack.clone(); + let threads = threads.clone(); + move || { + Entry::push_all(&stack, 2, PUSHES); + threads.fetch_sub(1, Ordering::Relaxed); + } + }); + + let mut seen = Vec::new(); + + loop { + seen.extend(stack.drain().map(|entry| entry.val)); + + if threads.load(Ordering::Relaxed) == 0 { + break; + } + + thread::yield_now(); + } + + seen.extend(stack.drain().map(|entry| entry.val)); + + seen.sort(); + assert_eq!(seen, vec![10, 11, 20, 21]); + + thread1.join().unwrap(); + thread2.join().unwrap(); + }) + } + + #[test] + fn multithreaded_pop() { + const PUSHES: i32 = 2; + loom::model(|| { + let stack = Arc::new(TransferStack::new()); + let thread1 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 1, PUSHES) + }); + + let thread2 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 2, PUSHES) + }); + + let thread3 = thread::spawn({ + let stack = stack.clone(); + move || stack.drain().map(|entry| entry.val).collect::>() + }); + + let seen_thread0 = stack.drain().map(|entry| entry.val).collect::>(); + let seen_thread3 = thread3.join().unwrap(); + + thread1.join().unwrap(); + thread2.join().unwrap(); + + let seen_thread0_final = stack.drain().map(|entry| entry.val).collect::>(); + + let mut all = dbg!(seen_thread0); + all.extend(dbg!(seen_thread3)); + all.extend(dbg!(seen_thread0_final)); + + all.sort(); + assert_eq!(all, vec![10, 11, 20, 21]); + }) + } +} + +#[cfg(test)] +mod test_util { + use super::*; + use core::pin::Pin; + + #[pin_project::pin_project] + #[repr(C)] + pub(super) struct Entry { + #[pin] + links: Links, + pub(super) val: i32, + } + + unsafe impl Linked> for Entry { + type Handle = Pin>; + + fn into_ptr(handle: Pin>) -> NonNull { + unsafe { NonNull::from(Box::leak(Pin::into_inner_unchecked(handle))) } + } + + unsafe fn from_ptr(ptr: NonNull) -> Self::Handle { + // Safety: if this function is only called by the linked list + // implementation (and it is not intended for external use), we can + // expect that the `NonNull` was constructed from a reference which + // was pinned. + // + // If other callers besides `List`'s internals were to call this on + // some random `NonNull`, this would not be the case, and + // this could be constructing an erroneous `Pin` from a referent + // that may not be pinned! + Pin::new_unchecked(Box::from_raw(ptr.as_ptr())) + } + + unsafe fn links(target: NonNull) -> NonNull> { + // Safety: this is safe because the `links` are the first field of + // `Entry`, and `Entry` is `repr(C)`. + target.cast() + } + } + + impl Entry { + pub(super) fn new(val: i32) -> Pin> { + Box::pin(Entry { + links: Links::new(), + val, + }) + } + + pub(super) fn push_all(stack: &TransferStack, thread: i32, n: i32) { + for i in 0..n { + let entry = Self::new((thread * 10) + i); + stack.push(entry); + } + } + } +} diff --git a/cordyceps/src/util.rs b/cordyceps/src/util.rs index 9e3ce2ac..3e28c57b 100644 --- a/cordyceps/src/util.rs +++ b/cordyceps/src/util.rs @@ -4,6 +4,79 @@ use core::{ ops::{Deref, DerefMut}, }; +#[cfg(all(test, not(loom)))] +pub(crate) use tracing_02 as tracing; + +#[cfg(all(test, loom))] +pub(crate) use tracing_01 as tracing; + +#[cfg(not(test))] +macro_rules! test_dbg { + ($e:expr) => { + $e + }; +} + +#[cfg(test)] +macro_rules! test_dbg { + ($e:expr) => { + match $e { + e => { + crate::util::tracing::debug!( + location = %core::panic::Location::caller(), + "{} = {:?}", + stringify!($e), + &e + ); + e + } + } + }; +} + +#[cfg(not(test))] +macro_rules! test_trace { + ($($args:tt)+) => {}; +} + +#[cfg(test)] +macro_rules! test_trace { + ($($args:tt)+) => { + crate::util::tracing::debug!( + location = %core::panic::Location::caller(), + $($args)+ + ); + }; +} + +macro_rules! feature { + ( + #![$meta:meta] + $($item:item)* + ) => { + $( + #[cfg($meta)] + #[cfg_attr(docsrs, doc(cfg($meta)))] + $item + )* + } +} + +macro_rules! loom_const_fn { + ( + $(#[$meta:meta])* + $vis:vis fn $name:ident($($arg:ident: $T:ty),*) -> $Ret:ty $body:block + ) => { + $(#[$meta])* + #[cfg(not(loom))] + $vis const fn $name($($arg: $T),*) -> $Ret $body + + $(#[$meta])* + #[cfg(loom)] + $vis fn $name($($arg: $T),*) -> $Ret $body + } +} + /// An exponential backoff for spin loops #[derive(Debug, Clone)] pub(crate) struct Backoff { From 5f087aac86496d820428ffc435ddec2361faefc7 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 10:06:35 -0700 Subject: [PATCH 03/12] fixy --- cordyceps/Cargo.toml | 3 +-- cordyceps/src/transfer_stack.rs | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cordyceps/Cargo.toml b/cordyceps/Cargo.toml index 7ea33f3a..3ffebc22 100644 --- a/cordyceps/Cargo.toml +++ b/cordyceps/Cargo.toml @@ -26,6 +26,7 @@ no-cache-pad = [] [dev-dependencies] proptest = "1" pin-project = "1" +tracing_01 = { package = "tracing", version = "0.1", default_features = false } [dev-dependencies.tracing_02] package = "tracing" @@ -37,11 +38,9 @@ tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", features = [ [target.'cfg(loom)'.dependencies] loom = "0.5.5" -tracing = { version = "0.1" } [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.5.5", features = ["futures", "checkpoint"] } -tracing_01 = { package = "tracing", version = "0.1", default_features = false } tracing_subscriber_03 = { package = "tracing-subscriber", version = "0.3.11", features = ["fmt"] } [package.metadata.docs.rs] diff --git a/cordyceps/src/transfer_stack.rs b/cordyceps/src/transfer_stack.rs index e187f403..62114305 100644 --- a/cordyceps/src/transfer_stack.rs +++ b/cordyceps/src/transfer_stack.rs @@ -14,6 +14,7 @@ use core::{ pub struct TransferStack { head: AtomicPtr, } + pub struct Drain { next: Option>, } From f88c12186a03f5b7fc429e5e3f590ff229f0c47b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 10:28:01 -0700 Subject: [PATCH 04/12] run tests outside of loom --- Cargo.lock | 2 - cordyceps/Cargo.toml | 18 +-- cordyceps/src/loom.rs | 264 +++++++++++++++++++++++++++++++- cordyceps/src/transfer_stack.rs | 2 +- cordyceps/src/util.rs | 62 +------- 5 files changed, 266 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8b94ca3..0c046372 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -473,8 +473,6 @@ dependencies = [ "pin-project", "proptest", "tracing 0.1.37", - "tracing 0.2.0", - "tracing-subscriber 0.3.0", "tracing-subscriber 0.3.16", ] diff --git a/cordyceps/Cargo.toml b/cordyceps/Cargo.toml index 3ffebc22..a76a23e8 100644 --- a/cordyceps/Cargo.toml +++ b/cordyceps/Cargo.toml @@ -25,24 +25,14 @@ no-cache-pad = [] [dev-dependencies] proptest = "1" +tracing = { version = "0.1" } +tracing-subscriber = { version = "0.3", features = ["fmt"] } pin-project = "1" -tracing_01 = { package = "tracing", version = "0.1", default_features = false } - -[dev-dependencies.tracing_02] -package = "tracing" -default_features = false -git = "https://github.com/tokio-rs/tracing" - -[target.'cfg(not(loom))'.dev-dependencies] -tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", features = ["ansi", "fmt"] } [target.'cfg(loom)'.dependencies] loom = "0.5.5" - -[target.'cfg(loom)'.dev-dependencies] -loom = { version = "0.5.5", features = ["futures", "checkpoint"] } -tracing_subscriber_03 = { package = "tracing-subscriber", version = "0.3.11", features = ["fmt"] } +tracing = { version = "0.1" } [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = ["--cfg", "docsrs"] \ No newline at end of file diff --git a/cordyceps/src/loom.rs b/cordyceps/src/loom.rs index 1c46b791..cf008372 100644 --- a/cordyceps/src/loom.rs +++ b/cordyceps/src/loom.rs @@ -67,13 +67,16 @@ mod inner { pub(crate) mod sync { pub use core::sync::*; - #[cfg(feature = "alloc")] + #[cfg(all(feature = "alloc", not(test)))] pub use alloc::sync::*; + + #[cfg(test)] + pub use std::sync::*; } pub(crate) use core::sync::atomic; - #[cfg(feature = "std")] + #[cfg(test)] pub use std::thread; pub(crate) mod hint { @@ -138,23 +141,270 @@ mod inner { } } } + + #[cfg(test)] + pub(crate) mod model { + #[non_exhaustive] + #[derive(Default)] + pub(crate) struct Builder { + pub(crate) max_threads: usize, + pub(crate) max_branches: usize, + pub(crate) max_permutations: Option, + // pub(crate) max_duration: Option, + pub(crate) preemption_bound: Option, + // pub(crate) checkpoint_file: Option, + pub(crate) checkpoint_interval: usize, + pub(crate) location: bool, + pub(crate) log: bool, + } + + impl Builder { + pub(crate) fn new() -> Self { + Self::default() + } + + pub(crate) fn check(&self, f: impl FnOnce()) { + let registry = super::alloc::track::Registry::default(); + let _tracking = registry.set_default(); + f(); + registry.check(); + } + } + } + + #[cfg(test)] + pub(crate) fn model(f: impl FnOnce()) { + let collector = tracing_subscriber::fmt() + .with_max_level(tracing::Level::TRACE) + .with_test_writer() + .without_time() + .with_thread_ids(true) + .with_thread_names(false) + .finish(); + let _ = tracing::subscriber::set_global_default(collector); + model::Builder::new().check(f) + } + pub(crate) mod alloc { + #[cfg(test)] + use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + }; + + #[cfg(test)] + use std::sync::Arc; + + #[cfg(test)] + pub(in crate::loom) mod track { + use std::{ + cell::RefCell, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, Weak, + }, + }; + + #[derive(Clone, Debug, Default)] + pub(crate) struct Registry(Arc>); + + #[derive(Debug, Default)] + struct RegistryInner { + tracks: Vec>, + next_id: usize, + } + + #[derive(Debug)] + pub(super) struct TrackData { + was_leaked: AtomicBool, + type_name: &'static str, + location: &'static core::panic::Location<'static>, + id: usize, + } + + thread_local! { + static REGISTRY: RefCell> = RefCell::new(None); + } + + impl Registry { + pub(in crate::loom) fn current() -> Option { + REGISTRY.with(|current| current.borrow().clone()) + } + + pub(in crate::loom) fn set_default(&self) -> impl Drop { + struct Unset(Option); + impl Drop for Unset { + fn drop(&mut self) { + let _ = + REGISTRY.try_with(|current| *current.borrow_mut() = self.0.take()); + } + } + + REGISTRY.with(|current| { + let mut current = current.borrow_mut(); + let unset = Unset(current.clone()); + *current = Some(self.clone()); + unset + }) + } + + #[track_caller] + pub(super) fn start_tracking() -> Option> { + // we don't use `Option::map` here because it creates a + // closure, which breaks `#[track_caller]`, since the caller + // of `insert` becomes the closure, which cannot have a + // `#[track_caller]` attribute on it. + #[allow(clippy::manual_map)] + match Self::current() { + Some(registry) => Some(registry.insert::()), + _ => None, + } + } + + #[track_caller] + pub(super) fn insert(&self) -> Arc { + let mut inner = self.0.lock().unwrap(); + let id = inner.next_id; + inner.next_id += 1; + let location = core::panic::Location::caller(); + let type_name = std::any::type_name::(); + let data = Arc::new(TrackData { + type_name, + location, + id, + was_leaked: AtomicBool::new(false), + }); + let weak = Arc::downgrade(&data); + test_trace!( + target: "maitake::alloc", + id, + "type" = %type_name, + %location, + "started tracking allocation", + ); + inner.tracks.push(weak); + data + } + + pub(in crate::loom) fn check(&self) { + let leaked = self + .0 + .lock() + .unwrap() + .tracks + .iter() + .filter_map(|weak| { + let data = weak.upgrade()?; + data.was_leaked.store(true, Ordering::SeqCst); + Some(format!( + " - id {}, {} allocated at {}", + data.id, data.type_name, data.location + )) + }) + .collect::>(); + if !leaked.is_empty() { + let leaked = leaked.join("\n "); + panic!("the following allocations were leaked:\n {leaked}"); + } + } + } + + impl Drop for TrackData { + fn drop(&mut self) { + if !self.was_leaked.load(Ordering::SeqCst) { + test_trace!( + target: "maitake::alloc", + id = self.id, + "type" = %self.type_name, + location = %self.location, + "dropped all references to a tracked allocation", + ); + } + } + } + } + + #[cfg(test)] + #[derive(Debug)] + #[pin_project::pin_project] + pub(crate) struct TrackFuture { + #[pin] + inner: F, + track: Option>, + } + + #[cfg(test)] + impl Future for TrackFuture { + type Output = TrackFuture; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.inner.poll(cx).map(|inner| TrackFuture { + inner, + track: this.track.clone(), + }) + } + } + + #[cfg(test)] + impl TrackFuture { + /// Wrap a `Future` in a `TrackFuture` that participates in Loom's + /// leak checking. + #[track_caller] + pub(crate) fn new(inner: F) -> Self { + let track = track::Registry::start_tracking::(); + Self { inner, track } + } + + /// Stop tracking this future, and return the inner value. + pub(crate) fn into_inner(self) -> F { + self.inner + } + } + + #[cfg(test)] + #[track_caller] + pub(crate) fn track_future(inner: F) -> TrackFuture { + TrackFuture::new(inner) + } + + // PartialEq impl so that `assert_eq!(..., Ok(...))` works + #[cfg(test)] + impl PartialEq for TrackFuture { + fn eq(&self, other: &Self) -> bool { + self.inner == other.inner + } + } + /// Track allocations, detecting leaks #[derive(Debug, Default)] pub struct Track { value: T, + + #[cfg(test)] + track: Option>, } impl Track { + pub const fn new_const(value: T) -> Track { + Track { + value, + + #[cfg(test)] + track: None, + } + } + /// Track a value for leaks #[inline(always)] + #[track_caller] pub fn new(value: T) -> Track { - Track { value } - } + Track { + value, - #[inline(always)] - pub const fn new_const(value: T) -> Track { - Track { value } + #[cfg(test)] + track: track::Registry::start_tracking::(), + } } /// Get a reference to the value diff --git a/cordyceps/src/transfer_stack.rs b/cordyceps/src/transfer_stack.rs index 62114305..f6290e36 100644 --- a/cordyceps/src/transfer_stack.rs +++ b/cordyceps/src/transfer_stack.rs @@ -136,7 +136,7 @@ where } } -#[cfg(all(loom, test))] +#[cfg(test)] mod loom { use super::*; use crate::loom::{ diff --git a/cordyceps/src/util.rs b/cordyceps/src/util.rs index 3e28c57b..88eb558b 100644 --- a/cordyceps/src/util.rs +++ b/cordyceps/src/util.rs @@ -4,51 +4,6 @@ use core::{ ops::{Deref, DerefMut}, }; -#[cfg(all(test, not(loom)))] -pub(crate) use tracing_02 as tracing; - -#[cfg(all(test, loom))] -pub(crate) use tracing_01 as tracing; - -#[cfg(not(test))] -macro_rules! test_dbg { - ($e:expr) => { - $e - }; -} - -#[cfg(test)] -macro_rules! test_dbg { - ($e:expr) => { - match $e { - e => { - crate::util::tracing::debug!( - location = %core::panic::Location::caller(), - "{} = {:?}", - stringify!($e), - &e - ); - e - } - } - }; -} - -#[cfg(not(test))] -macro_rules! test_trace { - ($($args:tt)+) => {}; -} - -#[cfg(test)] -macro_rules! test_trace { - ($($args:tt)+) => { - crate::util::tracing::debug!( - location = %core::panic::Location::caller(), - $($args)+ - ); - }; -} - macro_rules! feature { ( #![$meta:meta] @@ -56,24 +11,15 @@ macro_rules! feature { ) => { $( #[cfg($meta)] - #[cfg_attr(docsrs, doc(cfg($meta)))] $item )* } } -macro_rules! loom_const_fn { - ( - $(#[$meta:meta])* - $vis:vis fn $name:ident($($arg:ident: $T:ty),*) -> $Ret:ty $body:block - ) => { - $(#[$meta])* - #[cfg(not(loom))] - $vis const fn $name($($arg: $T),*) -> $Ret $body - - $(#[$meta])* - #[cfg(loom)] - $vis fn $name($($arg: $T),*) -> $Ret $body +macro_rules! test_trace { + ($($tt:tt)*) => { + #[cfg(test)] + tracing::trace!($($tt)*) } } From e51c821c563ae0b9a23bd8c349af45c6689d84b6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 10:41:12 -0700 Subject: [PATCH 05/12] add leak tracking tests --- cordyceps/src/loom.rs | 74 +++++++-------------------------- cordyceps/src/transfer_stack.rs | 70 ++++++++++++++++++++++++++++--- 2 files changed, 79 insertions(+), 65 deletions(-) diff --git a/cordyceps/src/loom.rs b/cordyceps/src/loom.rs index cf008372..09c18681 100644 --- a/cordyceps/src/loom.rs +++ b/cordyceps/src/loom.rs @@ -77,8 +77,20 @@ mod inner { pub(crate) use core::sync::atomic; #[cfg(test)] - pub use std::thread; - + pub(crate) mod thread { + pub(crate) use std::thread::{yield_now, JoinHandle}; + pub(crate) fn spawn(f: F) -> JoinHandle + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let track = super::alloc::track::Registry::current(); + std::thread::spawn(move || { + let _tracking = track.map(|track| track.set_default()); + f() + }) + } + } pub(crate) mod hint { #[inline(always)] pub(crate) fn spin_loop() { @@ -186,13 +198,6 @@ mod inner { } pub(crate) mod alloc { - #[cfg(test)] - use core::{ - future::Future, - pin::Pin, - task::{Context, Poll}, - }; - #[cfg(test)] use std::sync::Arc; @@ -325,57 +330,6 @@ mod inner { } } - #[cfg(test)] - #[derive(Debug)] - #[pin_project::pin_project] - pub(crate) struct TrackFuture { - #[pin] - inner: F, - track: Option>, - } - - #[cfg(test)] - impl Future for TrackFuture { - type Output = TrackFuture; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - this.inner.poll(cx).map(|inner| TrackFuture { - inner, - track: this.track.clone(), - }) - } - } - - #[cfg(test)] - impl TrackFuture { - /// Wrap a `Future` in a `TrackFuture` that participates in Loom's - /// leak checking. - #[track_caller] - pub(crate) fn new(inner: F) -> Self { - let track = track::Registry::start_tracking::(); - Self { inner, track } - } - - /// Stop tracking this future, and return the inner value. - pub(crate) fn into_inner(self) -> F { - self.inner - } - } - - #[cfg(test)] - #[track_caller] - pub(crate) fn track_future(inner: F) -> TrackFuture { - TrackFuture::new(inner) - } - - // PartialEq impl so that `assert_eq!(..., Ok(...))` works - #[cfg(test)] - impl PartialEq for TrackFuture { - fn eq(&self, other: &Self) -> bool { - self.inner == other.inner - } - } - /// Track allocations, detecting leaks #[derive(Debug, Default)] pub struct Track { diff --git a/cordyceps/src/transfer_stack.rs b/cordyceps/src/transfer_stack.rs index f6290e36..c20a26c2 100644 --- a/cordyceps/src/transfer_stack.rs +++ b/cordyceps/src/transfer_stack.rs @@ -1,7 +1,7 @@ use crate::{ loom::{ cell::UnsafeCell, - sync::atomic::{AtomicPtr, Ordering::*}, + sync::atomic::{AtomicPtr, AtomicUsize, Ordering::*}, }, Linked, }; @@ -89,6 +89,8 @@ where } } +// === impl Links === + impl Links { /// Returns new [`TransferStack`] links. #[cfg(not(loom))] @@ -124,6 +126,8 @@ where test_trace!(?curr, "Drain::next"); let curr = curr?; unsafe { + // Safety: we have exclusive ownership over this chunk of stack. + // advance the iterator to the next node after the current one (if // there is one). self.next = T::links(curr).as_mut().next.with_mut(|next| (*next).take()); @@ -231,19 +235,71 @@ mod loom { assert_eq!(all, vec![10, 11, 20, 21]); }) } + + #[test] + fn doesnt_leak() { + const PUSHES: i32 = 2; + loom::model(|| { + let stack = Arc::new(TransferStack::new()); + let thread1 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 1, PUSHES) + }); + + let thread2 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 2, PUSHES) + }); + + tracing::info!("dropping stack"); + drop(stack); + + thread1.join().unwrap(); + thread2.join().unwrap(); + }) + } + + #[test] + fn drain_doesnt_leak() { + const PUSHES: i32 = 2; + loom::model(|| { + let stack = Arc::new(TransferStack::new()); + let thread1 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 1, PUSHES) + }); + + let thread2 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 2, PUSHES) + }); + + let drain = stack.drain(); + + tracing::info!("dropping stack"); + drop(stack); + + tracing::info!("dropping drain"); + drop(drain); + + thread1.join().unwrap(); + thread2.join().unwrap(); + }) + } } #[cfg(test)] mod test_util { use super::*; use core::pin::Pin; + use crate::loom::alloc; #[pin_project::pin_project] - #[repr(C)] pub(super) struct Entry { #[pin] links: Links, pub(super) val: i32, + track: alloc::Track<()>, } unsafe impl Linked> for Entry { @@ -267,9 +323,12 @@ mod test_util { } unsafe fn links(target: NonNull) -> NonNull> { - // Safety: this is safe because the `links` are the first field of - // `Entry`, and `Entry` is `repr(C)`. - target.cast() + let links = ptr::addr_of_mut!((*target.as_ptr()).links); + // Safety: it's fine to use `new_unchecked` here; if the pointer that we + // offset to the `links` field is not null (which it shouldn't be, as we + // received it as a `NonNull`), the offset pointer should therefore also + // not be null. + NonNull::new_unchecked(links) } } @@ -278,6 +337,7 @@ mod test_util { Box::pin(Entry { links: Links::new(), val, + track: alloc::Track::new(()), }) } From 6308ada872ac8e131f54ba52814fda5fb8745870 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 10:46:37 -0700 Subject: [PATCH 06/12] drop owned entries when dropping a `TransferStack` --- cordyceps/src/transfer_stack.rs | 58 +++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/cordyceps/src/transfer_stack.rs b/cordyceps/src/transfer_stack.rs index c20a26c2..df60c84f 100644 --- a/cordyceps/src/transfer_stack.rs +++ b/cordyceps/src/transfer_stack.rs @@ -11,11 +11,11 @@ use core::{ ptr::{self, NonNull}, }; -pub struct TransferStack { +pub struct TransferStack>> { head: AtomicPtr, } -pub struct Drain { +pub struct Drain>> { next: Option>, } @@ -89,6 +89,19 @@ where } } +impl Drop for TransferStack +where + T: Linked>, +{ + fn drop(&mut self) { + // The stack owns any entries that are still in the stack; ensure they + // are dropped before dropping the stack. + for entry in self.drain() { + drop(entry); + } + } +} + // === impl Links === impl Links { @@ -140,6 +153,19 @@ where } } +impl Drop for Drain +where + T: Linked>, +{ + fn drop(&mut self) { + // The `Drain` iterator *owns* all entries popped from the stack. Ensure + // that they are all dropped prior to dropping the iterator. + for entry in self { + drop(entry); + } + } +} + #[cfg(test)] mod loom { use super::*; @@ -274,6 +300,9 @@ mod loom { move || Entry::push_all(&stack, 2, PUSHES) }); + thread1.join().unwrap(); + thread2.join().unwrap(); + let drain = stack.drain(); tracing::info!("dropping stack"); @@ -281,9 +310,34 @@ mod loom { tracing::info!("dropping drain"); drop(drain); + }) + } + + #[test] + fn drain_doesnt_leak_racy() { + const PUSHES: i32 = 2; + loom::model(|| { + let stack = Arc::new(TransferStack::new()); + let thread1 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 1, PUSHES) + }); + + let thread2 = thread::spawn({ + let stack = stack.clone(); + move || Entry::push_all(&stack, 2, PUSHES) + }); + + let drain = stack.drain(); thread1.join().unwrap(); thread2.join().unwrap(); + + tracing::info!("dropping stack"); + drop(stack); + + tracing::info!("dropping drain"); + drop(drain); }) } } From b84129e4f274ec5d61f88f7f1d5683bfcdd858f4 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 10:54:39 -0700 Subject: [PATCH 07/12] add send/sync impls --- cordyceps/src/lib.rs | 1 + cordyceps/src/transfer_stack.rs | 34 +++++++++++++++++++++++++++++++++ cordyceps/src/util.rs | 3 +++ 3 files changed, 38 insertions(+) diff --git a/cordyceps/src/lib.rs b/cordyceps/src/lib.rs index 93072477..afd2f03a 100644 --- a/cordyceps/src/lib.rs +++ b/cordyceps/src/lib.rs @@ -2,6 +2,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg, doc_cfg_hide))] #![cfg_attr(docsrs, deny(missing_docs))] #![cfg_attr(not(any(feature = "std", test)), no_std)] +#![warn(missing_debug_implementations)] #![allow(unused_unsafe)] #[cfg(feature = "alloc")] diff --git a/cordyceps/src/transfer_stack.rs b/cordyceps/src/transfer_stack.rs index df60c84f..444d0ff4 100644 --- a/cordyceps/src/transfer_stack.rs +++ b/cordyceps/src/transfer_stack.rs @@ -82,6 +82,7 @@ where } } + #[must_use] pub fn drain(&self) -> Drain { let head = self.head.swap(ptr::null_mut(), AcqRel); let next = NonNull::new(head); @@ -126,6 +127,24 @@ impl Links { } } +/// # Safety +/// +/// Types containing [`Links`] may be `Send`: the pointers within the `Links` may +/// mutably alias another value, but the links can only be _accessed_ by the +/// owner of the [`TransferStack`] itself, because the pointers are private. As +/// long as [`TransferStack`] upholds its own invariants, `Links` should not +/// make a type `!Send`. +unsafe impl Send for Links {} + +/// # Safety +/// +/// Types containing [`Links`] may be `Send`: the pointers within the `Links` may +/// mutably alias another value, but the links can only be _accessed_ by the +/// owner of the [`TransferStack`] itself, because the pointers are private. As +/// long as [`TransferStack`] upholds its own invariants, `Links` should not +/// make a type `!Send`. +unsafe impl Sync for Links {} + // === impl Drain === impl Iterator for Drain @@ -342,6 +361,21 @@ mod loom { } } +#[cfg(test)] +mod test { + use super::{*, test_util::Entry}; + + #[test] + fn stack_is_send_sync() { + crate::util::assert_send_sync::>() + } + + #[test] + fn links_are_send_sync() { + crate::util::assert_send_sync::>() + } +} + #[cfg(test)] mod test_util { use super::*; diff --git a/cordyceps/src/util.rs b/cordyceps/src/util.rs index 88eb558b..e28f4f0b 100644 --- a/cordyceps/src/util.rs +++ b/cordyceps/src/util.rs @@ -150,3 +150,6 @@ impl fmt::Display for FmtOption<'_, T> { } } } + +#[cfg(test)] +pub(crate) fn assert_send_sync() {} \ No newline at end of file From 6c05bc81811c20d8326f07751d004df90373ff8f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 11:56:31 -0700 Subject: [PATCH 08/12] redo apis --- cordyceps/src/lib.rs | 1 - cordyceps/src/transfer_stack.rs | 253 ++++++++++++++++++++++++-------- 2 files changed, 190 insertions(+), 64 deletions(-) diff --git a/cordyceps/src/lib.rs b/cordyceps/src/lib.rs index afd2f03a..93072477 100644 --- a/cordyceps/src/lib.rs +++ b/cordyceps/src/lib.rs @@ -2,7 +2,6 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg, doc_cfg_hide))] #![cfg_attr(docsrs, deny(missing_docs))] #![cfg_attr(not(any(feature = "std", test)), no_std)] -#![warn(missing_debug_implementations)] #![allow(unused_unsafe)] #[cfg(feature = "alloc")] diff --git a/cordyceps/src/transfer_stack.rs b/cordyceps/src/transfer_stack.rs index 444d0ff4..05014d81 100644 --- a/cordyceps/src/transfer_stack.rs +++ b/cordyceps/src/transfer_stack.rs @@ -1,7 +1,15 @@ +//! [Intrusive] stacks. +//! +//! See the documentation for the [`Stack`] and [`TransferStack`] types for +//! details. +//! +//! [intrusive]: crate#intrusive-data-structures +#![warn(missing_debug_implementations)] + use crate::{ loom::{ cell::UnsafeCell, - sync::atomic::{AtomicPtr, AtomicUsize, Ordering::*}, + sync::atomic::{AtomicPtr, Ordering::*}, }, Linked, }; @@ -11,15 +19,22 @@ use core::{ ptr::{self, NonNull}, }; +/// An [intrusive], lock-free singly-linked stack, where all entries currently in +/// the list are consumed in a single atomic operation. +/// +/// A transfer stack is perhaps the world's simplest lock-free concurrent data +/// structure. +/// +/// [intrusive]: crate#intrusive-data-structures pub struct TransferStack>> { head: AtomicPtr, } -pub struct Drain>> { - next: Option>, +pub struct Stack>> { + head: Option>, } -/// Links to other nodes in a [`TransferStack`]. +/// Links to other nodes in a [`TransferStack`] or [`Stack`]. /// /// In order to be part of a [`TransferStack`], a type must contain an instance of this /// type, and must implement the [`Linked`] trait for `Links`. @@ -33,11 +48,13 @@ pub struct Links { _unpin: PhantomPinned, } +// === impl AtomicStack === + impl TransferStack where T: Linked>, { - /// Returns a new `TransferStack`. + /// Returns a new `AtomicStack`. #[cfg(not(loom))] #[must_use] pub const fn new() -> Self { @@ -46,7 +63,7 @@ where } } - /// Returns a new `TransferStack`. + /// Returns a new `AtomicStack`. #[cfg(loom)] #[must_use] pub fn new() -> Self { @@ -57,14 +74,13 @@ where pub fn push(&self, element: T::Handle) { let ptr = T::into_ptr(element); - test_trace!(?ptr, "TransferStack::push"); - + test_trace!(?ptr, "AtomicStack::push"); let links = unsafe { T::links(ptr).as_mut() }; debug_assert!(links.next.with(|next| unsafe { (*next).is_none() })); let mut head = self.head.load(Relaxed); loop { - test_trace!(?ptr, ?head, "TransferStack::push"); + test_trace!(?ptr, ?head, "AtomicStack::push"); links.next.with_mut(|next| unsafe { *next = NonNull::new(head); }); @@ -74,7 +90,7 @@ where .compare_exchange_weak(head, ptr.as_ptr(), AcqRel, Acquire) { Ok(_) => { - test_trace!(?ptr, ?head, "TransferStack::push -> pushed"); + test_trace!(?ptr, ?head, "AtomicStack::push -> pushed"); return; } Err(actual) => head = actual, @@ -83,10 +99,10 @@ where } #[must_use] - pub fn drain(&self) -> Drain { + pub fn take_all(&self) -> Stack { let head = self.head.swap(ptr::null_mut(), AcqRel); - let next = NonNull::new(head); - Drain { next } + let head = NonNull::new(head); + Stack { head } } } @@ -97,12 +113,121 @@ where fn drop(&mut self) { // The stack owns any entries that are still in the stack; ensure they // are dropped before dropping the stack. - for entry in self.drain() { + for entry in self.take_all() { drop(entry); } } } +impl fmt::Debug for TransferStack +where + T: Linked>, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Self { head } = self; + f.debug_struct("AtomicStack").field("head", head).finish() + } +} + + +// === impl UnsyncStack === + +impl Stack +where + T: Linked>, +{ + /// Returns a new `UnsyncStack`. + #[must_use] + pub const fn new() -> Self { + Self { + head: None, + } + } + + pub fn push(&mut self, element: T::Handle) { + let ptr = T::into_ptr(element); + test_trace!(?ptr, ?self.head, "UnsyncStack::push"); + unsafe { + // Safety: we have exclusive mutable access to the stack, and + // therefore can also mutate the stack's entries. + let links = T::links(ptr).as_mut(); + links.next.with_mut(|next| { + debug_assert!((*next).is_none()); + *next = self.head.replace(ptr); + }) + } + } + + #[must_use] + pub fn pop(&mut self) -> Option { + test_trace!(?self.head, "Stack::pop"); + let head = self.head.take()?; + unsafe { + // Safety: we have exclusive ownership over this chunk of stack. + + // advance the iterator to the next node after the current one (if + // there is one). + self.head = T::links(head).as_mut().next.with_mut(|next| (*next).take()); + + test_trace!(?self.head, "Stack::pop -> popped"); + + // return the current node + Some(T::from_ptr(head)) + } + } + + #[must_use] + pub fn take_all(&mut self) -> Self { + Self { + head: self.head.take(), + } + } +} + +impl Drop for Stack +where + T: Linked>, +{ + fn drop(&mut self) { + // The stack owns any entries that are still in the stack; ensure they + // are dropped before dropping the stack. + for entry in self { + drop(entry); + } + } +} + +impl fmt::Debug for Stack +where + T: Linked>, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Self { head } = self; + f.debug_struct("Stack").field("head", head).finish() + } +} + +impl Iterator for Stack +where + T: Linked>, +{ + type Item = T::Handle; + + fn next(&mut self) -> Option { + self.pop() + } +} + +/// # Safety +/// +/// A `Stack` is `Send` if `T` is send, because moving it across threads +/// also implicitly moves any `T`s in the stack. +unsafe impl Send for Stack +where T: Send, T: Linked> {} + +unsafe impl Sync for Stack +where T: Sync, T: Linked> {} + // === impl Links === impl Links { @@ -145,45 +270,12 @@ unsafe impl Send for Links {} /// make a type `!Send`. unsafe impl Sync for Links {} -// === impl Drain === - -impl Iterator for Drain -where - T: Linked>, -{ - type Item = T::Handle; - - fn next(&mut self) -> Option { - let curr = self.next.take(); - test_trace!(?curr, "Drain::next"); - let curr = curr?; - unsafe { - // Safety: we have exclusive ownership over this chunk of stack. - - // advance the iterator to the next node after the current one (if - // there is one). - self.next = T::links(curr).as_mut().next.with_mut(|next| (*next).take()); - - test_trace!(?self.next, "Drain::next"); - - // return the current node - Some(T::from_ptr(curr)) - } +impl fmt::Debug for Links { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("transfer_stack::Links { ... }") } } -impl Drop for Drain -where - T: Linked>, -{ - fn drop(&mut self) { - // The `Drain` iterator *owns* all entries popped from the stack. Ensure - // that they are all dropped prior to dropping the iterator. - for entry in self { - drop(entry); - } - } -} #[cfg(test)] mod loom { @@ -225,7 +317,7 @@ mod loom { let mut seen = Vec::new(); loop { - seen.extend(stack.drain().map(|entry| entry.val)); + seen.extend(stack.take_all().map(|entry| entry.val)); if threads.load(Ordering::Relaxed) == 0 { break; @@ -234,7 +326,7 @@ mod loom { thread::yield_now(); } - seen.extend(stack.drain().map(|entry| entry.val)); + seen.extend(stack.take_all().map(|entry| entry.val)); seen.sort(); assert_eq!(seen, vec![10, 11, 20, 21]); @@ -261,16 +353,16 @@ mod loom { let thread3 = thread::spawn({ let stack = stack.clone(); - move || stack.drain().map(|entry| entry.val).collect::>() + move || stack.take_all().map(|entry| entry.val).collect::>() }); - let seen_thread0 = stack.drain().map(|entry| entry.val).collect::>(); + let seen_thread0 = stack.take_all().map(|entry| entry.val).collect::>(); let seen_thread3 = thread3.join().unwrap(); thread1.join().unwrap(); thread2.join().unwrap(); - let seen_thread0_final = stack.drain().map(|entry| entry.val).collect::>(); + let seen_thread0_final = stack.take_all().map(|entry| entry.val).collect::>(); let mut all = dbg!(seen_thread0); all.extend(dbg!(seen_thread3)); @@ -305,7 +397,7 @@ mod loom { } #[test] - fn drain_doesnt_leak() { + fn take_all_doesnt_leak() { const PUSHES: i32 = 2; loom::model(|| { let stack = Arc::new(TransferStack::new()); @@ -322,18 +414,18 @@ mod loom { thread1.join().unwrap(); thread2.join().unwrap(); - let drain = stack.drain(); + let take_all = stack.take_all(); tracing::info!("dropping stack"); drop(stack); - tracing::info!("dropping drain"); - drop(drain); + tracing::info!("dropping take_all"); + drop(take_all); }) } #[test] - fn drain_doesnt_leak_racy() { + fn take_all_doesnt_leak_racy() { const PUSHES: i32 = 2; loom::model(|| { let stack = Arc::new(TransferStack::new()); @@ -347,7 +439,7 @@ mod loom { move || Entry::push_all(&stack, 2, PUSHES) }); - let drain = stack.drain(); + let take_all = stack.take_all(); thread1.join().unwrap(); thread2.join().unwrap(); @@ -355,10 +447,45 @@ mod loom { tracing::info!("dropping stack"); drop(stack); - tracing::info!("dropping drain"); - drop(drain); + tracing::info!("dropping take_all"); + drop(take_all); }) } + + + #[test] + fn unsync() { + loom::model(|| { + let mut stack = Stack::::new(); + stack.push(Entry::new(1)); + stack.push(Entry::new(2)); + stack.push(Entry::new(3)); + let mut take_all = stack.take_all(); + + for i in (1..=3).rev() { + assert_eq!(take_all.next().unwrap().val, i); + stack.push(Entry::new(10 + i)); + } + + let mut i = 11; + for entry in stack.take_all() { + assert_eq!(entry.val, i); + i += 1; + } + + }) + } + + #[test] + fn unsync_doesnt_leak() { + loom::model(|| { + let mut stack = Stack::::new(); + stack.push(Entry::new(1)); + stack.push(Entry::new(2)); + stack.push(Entry::new(3)); + }) + } + } #[cfg(test)] From d2886b358346ae92485df4decd9a50af01fc89ff Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 11:57:18 -0700 Subject: [PATCH 09/12] move --- cordyceps/src/lib.rs | 4 +++- cordyceps/src/{transfer_stack.rs => stack.rs} | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) rename cordyceps/src/{transfer_stack.rs => stack.rs} (99%) diff --git a/cordyceps/src/lib.rs b/cordyceps/src/lib.rs index 93072477..ba8b27d4 100644 --- a/cordyceps/src/lib.rs +++ b/cordyceps/src/lib.rs @@ -14,12 +14,14 @@ pub(crate) mod util; pub mod list; pub mod mpsc_queue; -pub mod transfer_stack; +pub mod stack; #[doc(inline)] pub use list::List; #[doc(inline)] pub use mpsc_queue::MpscQueue; +#[doc(inline)] +pub use stack::{TransferStack, Stack}; pub(crate) mod loom; diff --git a/cordyceps/src/transfer_stack.rs b/cordyceps/src/stack.rs similarity index 99% rename from cordyceps/src/transfer_stack.rs rename to cordyceps/src/stack.rs index 05014d81..fa7e30e9 100644 --- a/cordyceps/src/transfer_stack.rs +++ b/cordyceps/src/stack.rs @@ -510,10 +510,10 @@ mod test_util { use crate::loom::alloc; #[pin_project::pin_project] - pub(super) struct Entry { + pub(stack) struct Entry { #[pin] links: Links, - pub(super) val: i32, + pub(stack) val: i32, track: alloc::Track<()>, } From 6384b6cdf4a578ccc31acf8b0177e21c90923b66 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 14:11:38 -0700 Subject: [PATCH 10/12] docs --- cordyceps/src/lib.rs | 62 ++++++++++++++++++- cordyceps/src/list.rs | 7 ++- cordyceps/src/stack.rs | 132 +++++++++++++++++++++++++++++++++++------ 3 files changed, 179 insertions(+), 22 deletions(-) diff --git a/cordyceps/src/lib.rs b/cordyceps/src/lib.rs index ba8b27d4..83dd9c07 100644 --- a/cordyceps/src/lib.rs +++ b/cordyceps/src/lib.rs @@ -3,7 +3,65 @@ #![cfg_attr(docsrs, deny(missing_docs))] #![cfg_attr(not(any(feature = "std", test)), no_std)] #![allow(unused_unsafe)] - +//! +//! ## data structures +//! +//! `cordyceps` provides implementations of the following data structures: +//! +//! - **[`List`]: a mutable, doubly-linked list.** +//! +//! A [`List`] provides *O*(1) insertion and removal at both the head and +//! tail of the list. In addition, parts of a [`List`] may be split off to +//! form new [`List`]s, and two [`List`]s may be spliced together to form a +//! single [`List`], all in *O*(1) time. The [`list`] module also provides +//! [`list::Cursor`] and [`list::CursorMut`] types, which allow traversal and +//! modification of elements in a list. Finally, elements can remove themselves +//! from arbitrary positions in a [`List`], provided that they have mutable +//! access to the [`List`] itself. This makes the [`List`] type suitable for +//! use in cases where elements must be able to drop themselves while linked +//! into a list. +//! +//! The [`List`] type is **not** a lock-free data structure, and can only be +//! modified through `&mut` references. +//! +//! - **[`MpscQueue`]: a multi-producer, single-consumer (MPSC) lock-free +//! last-in, first-out (LIFO) queue.** +//! +//! A [`MpscQueue`] is a *lock-free* concurrent data structure that allows +//! multiple producers to concurrently push elements onto the queue, and a +//! single consumer to dequeue elements in the order that they were pushed. +//! +//! [`MpscQueue`]s can be used to efficiently share data from multiple +//! concurrent producers with a consumer. +//! +//! - **[`stack::Stack`]: a mutable, singly-linked first-in, first-out (FIFO) +//! stack.** +//! +//! This is a simple, singly-linked stack with *O*(1) push and pop +//! operations. The pop operation returns the *last* element pushed to the +//! stack. A [`Stack`] also implements the [`Iterator`] trait; iterating over +//! a stack pops elements from the end of the list. +//! +//! The [`Stack`] type is **not** a lock-free data structure, and can only be +//! modified through `&mut` references. +//! +//! - **[`stack::TransferStack`]: a lock-free, multi-producer FIFO stack, where +//! all elements currently in the stack are popped in a single atomic operation.** +//! +//! A [`TransferStack`] is a lock-free data structure where multiple producers +//! can [concurrently push elements](stack::TransferStack::push) to the end of +//! the stack through immutable `&` references. A consumer can [pop all +//! elements currently in the `TransferStack`](stack::TransferStack::take_all) +//! in a single atomic operation, returning a new [`Stack`]. Pushing an +//! element, and taking all elements in the [`TransferStack`] are both *O*(1) +//! operations. +//! +//! A [`TransferStack`] can be used to efficiently transfer ownership of +//! resources from multiple producers to a consumer, such as for reuse or +//! cleanup. +//! +//! [`Stack`]: stack::Stack +//! [`TransferStack`]: stack::TransferStack #[cfg(feature = "alloc")] extern crate alloc; #[cfg(test)] @@ -20,8 +78,6 @@ pub mod stack; pub use list::List; #[doc(inline)] pub use mpsc_queue::MpscQueue; -#[doc(inline)] -pub use stack::{TransferStack, Stack}; pub(crate) mod loom; diff --git a/cordyceps/src/list.rs b/cordyceps/src/list.rs index c73a7033..100de0bf 100644 --- a/cordyceps/src/list.rs +++ b/cordyceps/src/list.rs @@ -24,11 +24,16 @@ pub use self::cursor::{Cursor, CursorMut}; /// /// This data structure may be used as a first-in, first-out queue by using the /// [`List::push_front`] and [`List::pop_back`] methods. It also supports -/// random-access removals using the [`List::remove`] method. +/// random-access removals using the [`List::remove`] method. This makes the +/// [`List`] type suitable for use in cases where elements must be able to drop +/// themselves while linked into a list. /// /// This data structure can also be used as a stack or doubly-linked list by using /// the [`List::pop_front`] and [`List::push_back`] methods. /// +/// The [`List`] type is **not** a lock-free data structure, and can only be +/// modified through `&mut` references. +/// /// In order to be part of a `List`, a type `T` must implement [`Linked`] for /// [`list::Links`]. /// diff --git a/cordyceps/src/stack.rs b/cordyceps/src/stack.rs index fa7e30e9..14bf6465 100644 --- a/cordyceps/src/stack.rs +++ b/cordyceps/src/stack.rs @@ -1,4 +1,4 @@ -//! [Intrusive] stacks. +//! [Intrusive], singly-linked first-in, first-out (FIFO) stacks. //! //! See the documentation for the [`Stack`] and [`TransferStack`] types for //! details. @@ -19,25 +19,78 @@ use core::{ ptr::{self, NonNull}, }; -/// An [intrusive], lock-free singly-linked stack, where all entries currently in -/// the list are consumed in a single atomic operation. +/// An [intrusive] lock-free singly-linked FIFO stack, where all entries +/// currently in the stack are consumed in a single atomic operation. /// /// A transfer stack is perhaps the world's simplest lock-free concurrent data -/// structure. +/// structure. It provides two primary operations: +/// +/// - [`TransferStack::push`], which appends an element to the end of the +/// transfer stack, +/// +/// - [`TransferStack::take_all`], which atomically takes all elements currently +/// on the transfer stack and returns them as a new mutable [`Stack`]. +/// +/// These are both *O*(1) operations, although `push` performs a +/// compare-and-swap loop that may be retried if another producer concurrently +/// pushed an element. +/// +/// In order to be part of a `TransferStack`, a type `T` must implement +/// the [`Linked`] trait for [`stack::Links`](Links). +/// +/// Pushing elements into a `TransferStack` takes ownership of those elements +/// through an owning [`Handle` type](Linked::Handle). Dropping a +/// [`TransferStack`] drops all elements currently linked into the stack. +/// +/// A transfer stack is often useful in cases where a large number of resources +/// must be efficiently transferred from several producers to a consumer, such +/// as for reuse or cleanup. For example, a [`TransferStack`] can be used as the +/// "thread" (shared) free list in a [`mimalloc`-style sharded +/// allocator][mimalloc], with a mutable [`Stack`] used as the local +/// (unsynchronized) free list. When an allocation is freed from the same CPU +/// core that it was allocated on, it is pushed to the local free list, using an +/// unsynchronized mutable [`Stack::push`] operation. If an allocation is freed +/// from a different thread, it is instead pushed to that thread's shared free +/// list, a [`TransferStack`], using an atomic [`TransferStack::push`] +/// operation. New allocations are popped from the local unsynchronized free +/// list, and if the local free list is empty, the entire shared free list is +/// moved onto the local free list. This allows objects which do not leave the +/// CPU core they were allocated on to be both allocated and deallocated using +/// unsynchronized operations, and new allocations only perform an atomic +/// operation when the local free list is empty. /// /// [intrusive]: crate#intrusive-data-structures +/// [mimalloc]: https://www.microsoft.com/en-us/research/uploads/prod/2019/06/mimalloc-tr-v1.pdf pub struct TransferStack>> { head: AtomicPtr, } +/// An [intrusive] singly-linked mutable FIFO stack. +/// +/// This is a very simple implementation of a linked `Stack`, which provides +/// *O*(1) [`push`](Self::push) and [`pop`](Self::pop) operations. Items are +/// popped from the stack in the opposite order that they were pushed in. +/// +/// A [`Stack`] also implements the [`Iterator`] trait, with the +/// [`Iterator::next`] method popping elements from the end of the stack. +/// +/// In order to be part of a `Stack`, a type `T` must implement +/// the [`Linked`] trait for [`stack::Links`](Links). +/// +/// Pushing elements into a `Stack` takes ownership of those elements +/// through an owning [`Handle` type](Linked::Handle). Dropping a +/// `Stack` drops all elements currently linked into the stack. +/// +/// [intrusive]: crate#intrusive-data-structures pub struct Stack>> { head: Option>, } /// Links to other nodes in a [`TransferStack`] or [`Stack`]. /// -/// In order to be part of a [`TransferStack`], a type must contain an instance of this -/// type, and must implement the [`Linked`] trait for `Links`. +/// In order to be part of a [`Stack`] or [`TransferStack`], a type must contain +/// an instance of this type, and must implement the [`Linked`] trait for +/// `Links`. pub struct Links { /// The next node in the queue. next: UnsafeCell>>, @@ -54,7 +107,7 @@ impl TransferStack where T: Linked>, { - /// Returns a new `AtomicStack`. + /// Returns a new `TransferStack` with no elements. #[cfg(not(loom))] #[must_use] pub const fn new() -> Self { @@ -63,7 +116,7 @@ where } } - /// Returns a new `AtomicStack`. + /// Returns a new `TransferStack` with no elements. #[cfg(loom)] #[must_use] pub fn new() -> Self { @@ -72,15 +125,25 @@ where } } + /// Pushes `element` onto the end of this `TransferStack`, taking ownership + /// of it. + /// + /// This is an *O*(1) operation, although it performs a compare-and-swap + /// loop that may repeat if another producer is concurrently calling `push` + /// on the same `TransferStack`. + /// + /// This takes ownership over `element` through its [owning `Handle` + /// type](Linked::Handle). If the `TransferStack` is dropped before the + /// pushed `element` is removed from the stack, the `element` will be dropped. pub fn push(&self, element: T::Handle) { let ptr = T::into_ptr(element); - test_trace!(?ptr, "AtomicStack::push"); + test_trace!(?ptr, "TransferStack::push"); let links = unsafe { T::links(ptr).as_mut() }; debug_assert!(links.next.with(|next| unsafe { (*next).is_none() })); let mut head = self.head.load(Relaxed); loop { - test_trace!(?ptr, ?head, "AtomicStack::push"); + test_trace!(?ptr, ?head, "TransferStack::push"); links.next.with_mut(|next| unsafe { *next = NonNull::new(head); }); @@ -90,7 +153,7 @@ where .compare_exchange_weak(head, ptr.as_ptr(), AcqRel, Acquire) { Ok(_) => { - test_trace!(?ptr, ?head, "AtomicStack::push -> pushed"); + test_trace!(?ptr, ?head, "TransferStack::push -> pushed"); return; } Err(actual) => head = actual, @@ -98,6 +161,11 @@ where } } + /// Takes all elements *currently* in this `TransferStack`, returning a new + /// mutable [`Stack`] containing those elements. + /// + /// This is an *O*(1) operation which does not allocate memory. It will + /// never loop and does not spin. #[must_use] pub fn take_all(&self) -> Stack { let head = self.head.swap(ptr::null_mut(), AcqRel); @@ -125,18 +193,18 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let Self { head } = self; - f.debug_struct("AtomicStack").field("head", head).finish() + f.debug_struct("TransferStack").field("head", head).finish() } } -// === impl UnsyncStack === +// === impl Stack === impl Stack where T: Linked>, { - /// Returns a new `UnsyncStack`. + /// Returns a new `Stack` with no elements in it. #[must_use] pub const fn new() -> Self { Self { @@ -144,9 +212,19 @@ where } } + /// Pushes `element` onto the end of this `Stack`, taking ownership + /// of it. + /// + /// This is an *O*(1) operation that does not allocate memory. It will never + /// loop. + /// + /// This takes ownership over `element` through its [owning `Handle` + /// type](Linked::Handle). If the `Stack` is dropped before the + /// pushed `element` is [`pop`](Self::pop)pped from the stack, the `element` + /// will be dropped. pub fn push(&mut self, element: T::Handle) { let ptr = T::into_ptr(element); - test_trace!(?ptr, ?self.head, "UnsyncStack::push"); + test_trace!(?ptr, ?self.head, "Stack::push"); unsafe { // Safety: we have exclusive mutable access to the stack, and // therefore can also mutate the stack's entries. @@ -158,6 +236,12 @@ where } } + + /// Returns the element most recently [push](Self::push)ed to this `Stack`, + /// or `None` if the stack is empty. + /// + /// This is an *O*(1) operation which does not allocate memory. It will + /// never loop and does not spin. #[must_use] pub fn pop(&mut self) -> Option { test_trace!(?self.head, "Stack::pop"); @@ -165,7 +249,7 @@ where unsafe { // Safety: we have exclusive ownership over this chunk of stack. - // advance the iterator to the next node after the current one (if + // advance the head link to the next node after the current one (if // there is one). self.head = T::links(head).as_mut().next.with_mut(|next| (*next).take()); @@ -176,12 +260,24 @@ where } } + /// Takes all elements *currently* in this `Stack`, returning a new + /// mutable `Stack` containing those elements. + /// + /// This is an *O*(1) operation which does not allocate memory. It will + /// never loop and does not spin. #[must_use] pub fn take_all(&mut self) -> Self { Self { head: self.head.take(), } } + + /// Returns `true` if this `Stack` is empty. + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.head.is_none() + } } impl Drop for Stack @@ -510,10 +606,10 @@ mod test_util { use crate::loom::alloc; #[pin_project::pin_project] - pub(stack) struct Entry { + pub(super) struct Entry { #[pin] links: Links, - pub(stack) val: i32, + pub(super) val: i32, track: alloc::Track<()>, } From e4274a2f64ebcaee797a0f30641eef83dfac8cdb Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 14:18:56 -0700 Subject: [PATCH 11/12] reexport stack types --- cordyceps/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cordyceps/src/lib.rs b/cordyceps/src/lib.rs index 83dd9c07..fdd8f3df 100644 --- a/cordyceps/src/lib.rs +++ b/cordyceps/src/lib.rs @@ -34,7 +34,7 @@ //! [`MpscQueue`]s can be used to efficiently share data from multiple //! concurrent producers with a consumer. //! -//! - **[`stack::Stack`]: a mutable, singly-linked first-in, first-out (FIFO) +//! - **[`Stack`]: a mutable, singly-linked first-in, first-out (FIFO) //! stack.** //! //! This is a simple, singly-linked stack with *O*(1) push and pop @@ -45,7 +45,7 @@ //! The [`Stack`] type is **not** a lock-free data structure, and can only be //! modified through `&mut` references. //! -//! - **[`stack::TransferStack`]: a lock-free, multi-producer FIFO stack, where +//! - **[`TransferStack`]: a lock-free, multi-producer FIFO stack, where //! all elements currently in the stack are popped in a single atomic operation.** //! //! A [`TransferStack`] is a lock-free data structure where multiple producers @@ -59,9 +59,6 @@ //! A [`TransferStack`] can be used to efficiently transfer ownership of //! resources from multiple producers to a consumer, such as for reuse or //! cleanup. -//! -//! [`Stack`]: stack::Stack -//! [`TransferStack`]: stack::TransferStack #[cfg(feature = "alloc")] extern crate alloc; #[cfg(test)] @@ -78,6 +75,9 @@ pub mod stack; pub use list::List; #[doc(inline)] pub use mpsc_queue::MpscQueue; +#[doc(inline)] +pub use stack::{Stack, TransferStack}; + pub(crate) mod loom; From e9d3ad9121b5f598d0b7eba1b73c35b370f219de Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 10 May 2023 14:24:00 -0700 Subject: [PATCH 12/12] unbreak import --- cordyceps/src/mpsc_queue.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/cordyceps/src/mpsc_queue.rs b/cordyceps/src/mpsc_queue.rs index 24d63677..82e11ede 100644 --- a/cordyceps/src/mpsc_queue.rs +++ b/cordyceps/src/mpsc_queue.rs @@ -1252,7 +1252,6 @@ unsafe fn non_null(ptr: *mut T) -> NonNull { mod loom { use super::*; use crate::loom::{self, sync::Arc, thread}; - use crate::util::tracing; use test_util::*; #[test]