Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cordyceps): add Stack and TransferStack #434

Merged
merged 14 commits into from
May 10, 2023
2 changes: 1 addition & 1 deletion cordyceps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ tracing = { version = "0.1" }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
rustdoc-args = ["--cfg", "docsrs"]
78 changes: 64 additions & 14 deletions cordyceps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,83 @@
#![cfg_attr(docsrs, deny(missing_docs))]
#![cfg_attr(not(any(feature = "std", test)), no_std)]
#![allow(unused_unsafe)]

//!
//! ## data structures
//!
//! `cordyceps` provides implementations of the following data structures:
//!
//! - **[`List`]: a mutable, doubly-linked list.**
//!
//! A [`List`] provides *O*(1) insertion and removal at both the head and
//! tail of the list. In addition, parts of a [`List`] may be split off to
//! form new [`List`]s, and two [`List`]s may be spliced together to form a
//! single [`List`], all in *O*(1) time. The [`list`] module also provides
//! [`list::Cursor`] and [`list::CursorMut`] types, which allow traversal and
//! modification of elements in a list. Finally, elements can remove themselves
//! from arbitrary positions in a [`List`], provided that they have mutable
//! access to the [`List`] itself. This makes the [`List`] type suitable for
//! use in cases where elements must be able to drop themselves while linked
//! into a list.
//!
//! The [`List`] type is **not** a lock-free data structure, and can only be
//! modified through `&mut` references.
//!
//! - **[`MpscQueue`]: a multi-producer, single-consumer (MPSC) lock-free
//! last-in, first-out (LIFO) queue.**
//!
//! A [`MpscQueue`] is a *lock-free* concurrent data structure that allows
//! multiple producers to concurrently push elements onto the queue, and a
//! single consumer to dequeue elements in the order that they were pushed.
//!
//! [`MpscQueue`]s can be used to efficiently share data from multiple
//! concurrent producers with a consumer.
//!
//! - **[`Stack`]: a mutable, singly-linked first-in, first-out (FIFO)
//! stack.**
//!
//! This is a simple, singly-linked stack with *O*(1) push and pop
//! operations. The pop operation returns the *last* element pushed to the
//! stack. A [`Stack`] also implements the [`Iterator`] trait; iterating over
//! a stack pops elements from the end of the list.
//!
//! The [`Stack`] type is **not** a lock-free data structure, and can only be
//! modified through `&mut` references.
//!
//! - **[`TransferStack`]: a lock-free, multi-producer FIFO stack, where
//! all elements currently in the stack are popped in a single atomic operation.**
//!
//! A [`TransferStack`] is a lock-free data structure where multiple producers
//! can [concurrently push elements](stack::TransferStack::push) to the end of
//! the stack through immutable `&` references. A consumer can [pop all
//! elements currently in the `TransferStack`](stack::TransferStack::take_all)
//! in a single atomic operation, returning a new [`Stack`]. Pushing an
//! element, and taking all elements in the [`TransferStack`] are both *O*(1)
//! operations.
//!
//! A [`TransferStack`] can be used to efficiently transfer ownership of
//! resources from multiple producers to a consumer, such as for reuse or
//! cleanup.
#[cfg(feature = "alloc")]
extern crate alloc;
#[cfg(test)]
extern crate std;

macro_rules! feature {
(
#![$meta:meta]
$($item:item)*
) => {
$(
#[cfg($meta)]
$item
)*
}
}
#[macro_use]
pub(crate) mod util;

pub mod list;
pub mod mpsc_queue;
pub mod stack;

#[doc(inline)]
pub use list::List;
pub mod mpsc_queue;
#[doc(inline)]
pub use mpsc_queue::MpscQueue;
#[doc(inline)]
pub use stack::{Stack, TransferStack};


pub(crate) mod loom;
pub(crate) mod util;

use core::ptr::NonNull;

Expand Down
7 changes: 6 additions & 1 deletion cordyceps/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ pub use self::cursor::{Cursor, CursorMut};
///
/// This data structure may be used as a first-in, first-out queue by using the
/// [`List::push_front`] and [`List::pop_back`] methods. It also supports
/// random-access removals using the [`List::remove`] method.
/// random-access removals using the [`List::remove`] method. This makes the
/// [`List`] type suitable for use in cases where elements must be able to drop
/// themselves while linked into a list.
///
/// This data structure can also be used as a stack or doubly-linked list by using
/// the [`List::pop_front`] and [`List::push_back`] methods.
///
/// The [`List`] type is **not** a lock-free data structure, and can only be
/// modified through `&mut` references.
///
/// In order to be part of a `List`, a type `T` must implement [`Linked`] for
/// [`list::Links<T>`].
///
Expand Down
222 changes: 213 additions & 9 deletions cordyceps/src/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,30 @@ mod inner {
pub(crate) mod sync {
pub use core::sync::*;

#[cfg(feature = "alloc")]
#[cfg(all(feature = "alloc", not(test)))]
pub use alloc::sync::*;

#[cfg(test)]
pub use std::sync::*;
}

pub(crate) use core::sync::atomic;

#[cfg(feature = "std")]
pub use std::thread;

#[cfg(test)]
pub(crate) mod thread {
pub(crate) use std::thread::{yield_now, JoinHandle};
pub(crate) fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let track = super::alloc::track::Registry::current();
std::thread::spawn(move || {
let _tracking = track.map(|track| track.set_default());
f()
})
}
}
pub(crate) mod hint {
#[inline(always)]
pub(crate) fn spin_loop() {
Expand Down Expand Up @@ -138,23 +153,212 @@ mod inner {
}
}
}

#[cfg(test)]
pub(crate) mod model {
#[non_exhaustive]
#[derive(Default)]
pub(crate) struct Builder {
pub(crate) max_threads: usize,
pub(crate) max_branches: usize,
pub(crate) max_permutations: Option<usize>,
// pub(crate) max_duration: Option<Duration>,
pub(crate) preemption_bound: Option<usize>,
// pub(crate) checkpoint_file: Option<PathBuf>,
pub(crate) checkpoint_interval: usize,
pub(crate) location: bool,
pub(crate) log: bool,
}

impl Builder {
pub(crate) fn new() -> Self {
Self::default()
}

pub(crate) fn check(&self, f: impl FnOnce()) {
let registry = super::alloc::track::Registry::default();
let _tracking = registry.set_default();
f();
registry.check();
}
}
}

#[cfg(test)]
pub(crate) fn model(f: impl FnOnce()) {
let collector = tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.with_test_writer()
.without_time()
.with_thread_ids(true)
.with_thread_names(false)
.finish();
let _ = tracing::subscriber::set_global_default(collector);
model::Builder::new().check(f)
}

pub(crate) mod alloc {
#[cfg(test)]
use std::sync::Arc;

#[cfg(test)]
pub(in crate::loom) mod track {
use std::{
cell::RefCell,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, Weak,
},
};

#[derive(Clone, Debug, Default)]
pub(crate) struct Registry(Arc<Mutex<RegistryInner>>);

#[derive(Debug, Default)]
struct RegistryInner {
tracks: Vec<Weak<TrackData>>,
next_id: usize,
}

#[derive(Debug)]
pub(super) struct TrackData {
was_leaked: AtomicBool,
type_name: &'static str,
location: &'static core::panic::Location<'static>,
id: usize,
}

thread_local! {
static REGISTRY: RefCell<Option<Registry>> = RefCell::new(None);
}

impl Registry {
pub(in crate::loom) fn current() -> Option<Registry> {
REGISTRY.with(|current| current.borrow().clone())
}

pub(in crate::loom) fn set_default(&self) -> impl Drop {
struct Unset(Option<Registry>);
impl Drop for Unset {
fn drop(&mut self) {
let _ =
REGISTRY.try_with(|current| *current.borrow_mut() = self.0.take());
}
}

REGISTRY.with(|current| {
let mut current = current.borrow_mut();
let unset = Unset(current.clone());
*current = Some(self.clone());
unset
})
}

#[track_caller]
pub(super) fn start_tracking<T>() -> Option<Arc<TrackData>> {
// we don't use `Option::map` here because it creates a
// closure, which breaks `#[track_caller]`, since the caller
// of `insert` becomes the closure, which cannot have a
// `#[track_caller]` attribute on it.
#[allow(clippy::manual_map)]
match Self::current() {
Some(registry) => Some(registry.insert::<T>()),
_ => None,
}
}

#[track_caller]
pub(super) fn insert<T>(&self) -> Arc<TrackData> {
let mut inner = self.0.lock().unwrap();
let id = inner.next_id;
inner.next_id += 1;
let location = core::panic::Location::caller();
let type_name = std::any::type_name::<T>();
let data = Arc::new(TrackData {
type_name,
location,
id,
was_leaked: AtomicBool::new(false),
});
let weak = Arc::downgrade(&data);
test_trace!(
target: "maitake::alloc",
id,
"type" = %type_name,
%location,
"started tracking allocation",
);
inner.tracks.push(weak);
data
}

pub(in crate::loom) fn check(&self) {
let leaked = self
.0
.lock()
.unwrap()
.tracks
.iter()
.filter_map(|weak| {
let data = weak.upgrade()?;
data.was_leaked.store(true, Ordering::SeqCst);
Some(format!(
" - id {}, {} allocated at {}",
data.id, data.type_name, data.location
))
})
.collect::<Vec<_>>();
if !leaked.is_empty() {
let leaked = leaked.join("\n ");
panic!("the following allocations were leaked:\n {leaked}");
}
}
}

impl Drop for TrackData {
fn drop(&mut self) {
if !self.was_leaked.load(Ordering::SeqCst) {
test_trace!(
target: "maitake::alloc",
id = self.id,
"type" = %self.type_name,
location = %self.location,
"dropped all references to a tracked allocation",
);
}
}
}
}

/// Track allocations, detecting leaks
#[derive(Debug, Default)]
pub struct Track<T> {
value: T,

#[cfg(test)]
track: Option<Arc<track::TrackData>>,
}

impl<T> Track<T> {
pub const fn new_const(value: T) -> Track<T> {
Track {
value,

#[cfg(test)]
track: None,
}
}

/// Track a value for leaks
#[inline(always)]
#[track_caller]
pub fn new(value: T) -> Track<T> {
Track { value }
}
Track {
value,

#[inline(always)]
pub const fn new_const(value: T) -> Track<T> {
Track { value }
#[cfg(test)]
track: track::Registry::start_tracking::<T>(),
}
}

/// Get a reference to the value
Expand Down
Loading