Skip to content

Commit

Permalink
feat(cordyceps): add Stack and TransferStack (#434)
Browse files Browse the repository at this point in the history
This branch adds an implementation of a singly-linked mutable FIFO
`Stack`and a singly-linked lock-free FIFO `TransferStack` in
`cordyceps::stack`.

A `Stack` is just your traditional FIFO stack structure.
`TransferStack` is a bit more interesting --- it's a simplification of
the traditional lock-free Treiber stack wher all elements are popped
into a new mutable `Stack` in a single atomic operation. This avoids the
ABA problem issues common with naive Treiber stack implementations. It's
intended primarily for use in a `mimalloc`-style sharded per-core memory
allocator.

Closes #137

Signed-off-by: Eliza Weisman <eliza@elizaswebsite>
  • Loading branch information
hawkw authored May 10, 2023
1 parent c65965a commit 507b993
Show file tree
Hide file tree
Showing 6 changed files with 968 additions and 25 deletions.
2 changes: 1 addition & 1 deletion cordyceps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ tracing = { version = "0.1" }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
rustdoc-args = ["--cfg", "docsrs"]
78 changes: 64 additions & 14 deletions cordyceps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,83 @@
#![cfg_attr(docsrs, deny(missing_docs))]
#![cfg_attr(not(any(feature = "std", test)), no_std)]
#![allow(unused_unsafe)]

//!
//! ## data structures
//!
//! `cordyceps` provides implementations of the following data structures:
//!
//! - **[`List`]: a mutable, doubly-linked list.**
//!
//! A [`List`] provides *O*(1) insertion and removal at both the head and
//! tail of the list. In addition, parts of a [`List`] may be split off to
//! form new [`List`]s, and two [`List`]s may be spliced together to form a
//! single [`List`], all in *O*(1) time. The [`list`] module also provides
//! [`list::Cursor`] and [`list::CursorMut`] types, which allow traversal and
//! modification of elements in a list. Finally, elements can remove themselves
//! from arbitrary positions in a [`List`], provided that they have mutable
//! access to the [`List`] itself. This makes the [`List`] type suitable for
//! use in cases where elements must be able to drop themselves while linked
//! into a list.
//!
//! The [`List`] type is **not** a lock-free data structure, and can only be
//! modified through `&mut` references.
//!
//! - **[`MpscQueue`]: a multi-producer, single-consumer (MPSC) lock-free
//! last-in, first-out (LIFO) queue.**
//!
//! A [`MpscQueue`] is a *lock-free* concurrent data structure that allows
//! multiple producers to concurrently push elements onto the queue, and a
//! single consumer to dequeue elements in the order that they were pushed.
//!
//! [`MpscQueue`]s can be used to efficiently share data from multiple
//! concurrent producers with a consumer.
//!
//! - **[`Stack`]: a mutable, singly-linked first-in, first-out (FIFO)
//! stack.**
//!
//! This is a simple, singly-linked stack with *O*(1) push and pop
//! operations. The pop operation returns the *last* element pushed to the
//! stack. A [`Stack`] also implements the [`Iterator`] trait; iterating over
//! a stack pops elements from the end of the list.
//!
//! The [`Stack`] type is **not** a lock-free data structure, and can only be
//! modified through `&mut` references.
//!
//! - **[`TransferStack`]: a lock-free, multi-producer FIFO stack, where
//! all elements currently in the stack are popped in a single atomic operation.**
//!
//! A [`TransferStack`] is a lock-free data structure where multiple producers
//! can [concurrently push elements](stack::TransferStack::push) to the end of
//! the stack through immutable `&` references. A consumer can [pop all
//! elements currently in the `TransferStack`](stack::TransferStack::take_all)
//! in a single atomic operation, returning a new [`Stack`]. Pushing an
//! element, and taking all elements in the [`TransferStack`] are both *O*(1)
//! operations.
//!
//! A [`TransferStack`] can be used to efficiently transfer ownership of
//! resources from multiple producers to a consumer, such as for reuse or
//! cleanup.
#[cfg(feature = "alloc")]
extern crate alloc;
#[cfg(test)]
extern crate std;

macro_rules! feature {
(
#![$meta:meta]
$($item:item)*
) => {
$(
#[cfg($meta)]
$item
)*
}
}
#[macro_use]
pub(crate) mod util;

pub mod list;
pub mod mpsc_queue;
pub mod stack;

#[doc(inline)]
pub use list::List;
pub mod mpsc_queue;
#[doc(inline)]
pub use mpsc_queue::MpscQueue;
#[doc(inline)]
pub use stack::{Stack, TransferStack};


pub(crate) mod loom;
pub(crate) mod util;

use core::ptr::NonNull;

Expand Down
7 changes: 6 additions & 1 deletion cordyceps/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ pub use self::cursor::{Cursor, CursorMut};
///
/// This data structure may be used as a first-in, first-out queue by using the
/// [`List::push_front`] and [`List::pop_back`] methods. It also supports
/// random-access removals using the [`List::remove`] method.
/// random-access removals using the [`List::remove`] method. This makes the
/// [`List`] type suitable for use in cases where elements must be able to drop
/// themselves while linked into a list.
///
/// This data structure can also be used as a stack or doubly-linked list by using
/// the [`List::pop_front`] and [`List::push_back`] methods.
///
/// The [`List`] type is **not** a lock-free data structure, and can only be
/// modified through `&mut` references.
///
/// In order to be part of a `List`, a type `T` must implement [`Linked`] for
/// [`list::Links<T>`].
///
Expand Down
222 changes: 213 additions & 9 deletions cordyceps/src/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,30 @@ mod inner {
pub(crate) mod sync {
pub use core::sync::*;

#[cfg(feature = "alloc")]
#[cfg(all(feature = "alloc", not(test)))]
pub use alloc::sync::*;

#[cfg(test)]
pub use std::sync::*;
}

pub(crate) use core::sync::atomic;

#[cfg(feature = "std")]
pub use std::thread;

#[cfg(test)]
pub(crate) mod thread {
pub(crate) use std::thread::{yield_now, JoinHandle};
pub(crate) fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let track = super::alloc::track::Registry::current();
std::thread::spawn(move || {
let _tracking = track.map(|track| track.set_default());
f()
})
}
}
pub(crate) mod hint {
#[inline(always)]
pub(crate) fn spin_loop() {
Expand Down Expand Up @@ -138,23 +153,212 @@ mod inner {
}
}
}

#[cfg(test)]
pub(crate) mod model {
#[non_exhaustive]
#[derive(Default)]
pub(crate) struct Builder {
pub(crate) max_threads: usize,
pub(crate) max_branches: usize,
pub(crate) max_permutations: Option<usize>,
// pub(crate) max_duration: Option<Duration>,
pub(crate) preemption_bound: Option<usize>,
// pub(crate) checkpoint_file: Option<PathBuf>,
pub(crate) checkpoint_interval: usize,
pub(crate) location: bool,
pub(crate) log: bool,
}

impl Builder {
pub(crate) fn new() -> Self {
Self::default()
}

pub(crate) fn check(&self, f: impl FnOnce()) {
let registry = super::alloc::track::Registry::default();
let _tracking = registry.set_default();
f();
registry.check();
}
}
}

#[cfg(test)]
pub(crate) fn model(f: impl FnOnce()) {
let collector = tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.with_test_writer()
.without_time()
.with_thread_ids(true)
.with_thread_names(false)
.finish();
let _ = tracing::subscriber::set_global_default(collector);
model::Builder::new().check(f)
}

pub(crate) mod alloc {
#[cfg(test)]
use std::sync::Arc;

#[cfg(test)]
pub(in crate::loom) mod track {
use std::{
cell::RefCell,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, Weak,
},
};

#[derive(Clone, Debug, Default)]
pub(crate) struct Registry(Arc<Mutex<RegistryInner>>);

#[derive(Debug, Default)]
struct RegistryInner {
tracks: Vec<Weak<TrackData>>,
next_id: usize,
}

#[derive(Debug)]
pub(super) struct TrackData {
was_leaked: AtomicBool,
type_name: &'static str,
location: &'static core::panic::Location<'static>,
id: usize,
}

thread_local! {
static REGISTRY: RefCell<Option<Registry>> = RefCell::new(None);
}

impl Registry {
pub(in crate::loom) fn current() -> Option<Registry> {
REGISTRY.with(|current| current.borrow().clone())
}

pub(in crate::loom) fn set_default(&self) -> impl Drop {
struct Unset(Option<Registry>);
impl Drop for Unset {
fn drop(&mut self) {
let _ =
REGISTRY.try_with(|current| *current.borrow_mut() = self.0.take());
}
}

REGISTRY.with(|current| {
let mut current = current.borrow_mut();
let unset = Unset(current.clone());
*current = Some(self.clone());
unset
})
}

#[track_caller]
pub(super) fn start_tracking<T>() -> Option<Arc<TrackData>> {
// we don't use `Option::map` here because it creates a
// closure, which breaks `#[track_caller]`, since the caller
// of `insert` becomes the closure, which cannot have a
// `#[track_caller]` attribute on it.
#[allow(clippy::manual_map)]
match Self::current() {
Some(registry) => Some(registry.insert::<T>()),
_ => None,
}
}

#[track_caller]
pub(super) fn insert<T>(&self) -> Arc<TrackData> {
let mut inner = self.0.lock().unwrap();
let id = inner.next_id;
inner.next_id += 1;
let location = core::panic::Location::caller();
let type_name = std::any::type_name::<T>();
let data = Arc::new(TrackData {
type_name,
location,
id,
was_leaked: AtomicBool::new(false),
});
let weak = Arc::downgrade(&data);
test_trace!(
target: "maitake::alloc",
id,
"type" = %type_name,
%location,
"started tracking allocation",
);
inner.tracks.push(weak);
data
}

pub(in crate::loom) fn check(&self) {
let leaked = self
.0
.lock()
.unwrap()
.tracks
.iter()
.filter_map(|weak| {
let data = weak.upgrade()?;
data.was_leaked.store(true, Ordering::SeqCst);
Some(format!(
" - id {}, {} allocated at {}",
data.id, data.type_name, data.location
))
})
.collect::<Vec<_>>();
if !leaked.is_empty() {
let leaked = leaked.join("\n ");
panic!("the following allocations were leaked:\n {leaked}");
}
}
}

impl Drop for TrackData {
fn drop(&mut self) {
if !self.was_leaked.load(Ordering::SeqCst) {
test_trace!(
target: "maitake::alloc",
id = self.id,
"type" = %self.type_name,
location = %self.location,
"dropped all references to a tracked allocation",
);
}
}
}
}

/// Track allocations, detecting leaks
#[derive(Debug, Default)]
pub struct Track<T> {
value: T,

#[cfg(test)]
track: Option<Arc<track::TrackData>>,
}

impl<T> Track<T> {
pub const fn new_const(value: T) -> Track<T> {
Track {
value,

#[cfg(test)]
track: None,
}
}

/// Track a value for leaks
#[inline(always)]
#[track_caller]
pub fn new(value: T) -> Track<T> {
Track { value }
}
Track {
value,

#[inline(always)]
pub const fn new_const(value: T) -> Track<T> {
Track { value }
#[cfg(test)]
track: track::Registry::start_tracking::<T>(),
}
}

/// Get a reference to the value
Expand Down
Loading

0 comments on commit 507b993

Please sign in to comment.