diff --git a/maitake-sync/src/loom.rs b/maitake-sync/src/loom.rs index e3465eb1..d972adae 100644 --- a/maitake-sync/src/loom.rs +++ b/maitake-sync/src/loom.rs @@ -120,16 +120,33 @@ mod inner { #[cfg(test)] pub(crate) mod thread { + pub(crate) use std::thread::{yield_now, JoinHandle}; + pub(crate) fn spawn(f: F) -> JoinHandle where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { + use super::atomic::{AtomicUsize, Ordering::Relaxed}; + thread_local! { + static CHILDREN: AtomicUsize = const { AtomicUsize::new(1) }; + } + let track = super::alloc::track::Registry::current(); + let subscriber = tracing::Dispatch::default(); + let span = tracing::Span::current(); + let num = CHILDREN.with(|children| children.fetch_add(1, Relaxed)); std::thread::spawn(move || { + let _tracing = tracing::dispatcher::set_default(&subscriber); + let _span = tracing::info_span!(parent: span, "thread", message = num).entered(); + + tracing::info!(num, "spawned child thread"); let _tracking = track.map(|track| track.set_default()); - f() + let res = f(); + tracing::info!(num, "child thread completed"); + + res }) } } @@ -156,9 +173,19 @@ mod inner { } pub(crate) fn check(&self, f: impl FnOnce()) { + let _trace = crate::util::test::trace_init(); + let _span = tracing::info_span!( + "test", + message = std::thread::current().name().unwrap_or("") + ) + .entered(); let registry = super::alloc::track::Registry::default(); let _tracking = registry.set_default(); + + tracing::info!("started test..."); f(); + tracing::info!("test completed successfully!"); + registry.check(); } } @@ -166,7 +193,6 @@ mod inner { #[cfg(test)] pub(crate) fn model(f: impl FnOnce()) { - let _trace = crate::util::test::trace_init(); model::Builder::new().check(f) } diff --git a/maitake-sync/src/spin.rs b/maitake-sync/src/spin.rs index 66ef22dd..dda7276d 100644 --- a/maitake-sync/src/spin.rs +++ b/maitake-sync/src/spin.rs @@ -21,6 +21,7 @@ //! This module provides the following APIs: //! //! - [`Mutex`]: a synchronous [mutual exclusion] spinlock. +//! - [`RwLock`]: a synchronous [reader-writer] spinlock. //! - [`InitOnce`]: a cell storing a [`MaybeUninit`](core::mem::MaybeUninit) //! value which must be manually initialized prior to use. //! - [`Lazy`]: an [`InitOnce`] cell coupled with an initializer function. The @@ -28,10 +29,13 @@ //! value the first time it is accessed. //! //! [mutual exclusion lock]: https://en.wikipedia.org/wiki/Mutual_exclusion +//! [reader-writer lock]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock mod mutex; pub mod once; +mod rwlock; pub use self::{ mutex::*, once::{InitOnce, Lazy}, + rwlock::*, }; diff --git a/maitake-sync/src/spin/rwlock.rs b/maitake-sync/src/spin/rwlock.rs new file mode 100644 index 00000000..8ca35ab0 --- /dev/null +++ b/maitake-sync/src/spin/rwlock.rs @@ -0,0 +1,424 @@ +use crate::{ + loom::{ + cell::{ConstPtr, MutPtr, UnsafeCell}, + sync::atomic::{AtomicUsize, Ordering::*}, + }, + util::Backoff, +}; +use core::{ + fmt, + ops::{Deref, DerefMut}, +}; + +/// A spinlock-based [readers-writer lock]. +/// +/// This type of lock allows a number of readers or at most one writer at any +/// point in time. The write portion of this lock typically allows modification +/// of the underlying data (exclusive access) and the read portion of this lock +/// typically allows for read-only access (shared access). +/// +/// In comparison, a [`spin::Mutex`] does not distinguish between readers or writers +/// that acquire the lock, therefore blocking any threads waiting for the lock to +/// become available. An `RwLock` will allow any number of readers to acquire the +/// lock as long as a writer is not holding the lock. +/// +/// # Fairness +/// +/// This is *not* a fair reader-writer lock. +/// +/// # Loom-specific behavior +/// +/// When `cfg(loom)` is enabled, this mutex will use Loom's simulated atomics, +/// checked `UnsafeCell`, and simulated spin loop hints. +/// +/// [`spin::Mutex`]: crate::spin::Mutex +/// [readers-writer lock]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock +pub struct RwLock { + state: AtomicUsize, + data: UnsafeCell, +} + +/// An RAII implementation of a "scoped read lock" of a [`RwLock`]. When this +/// structure is dropped (falls out of scope), the lock will be unlocked. +/// +/// The data protected by the [`RwLock`] can be immutably accessed through this +/// guard via its [`Deref`] implementation. +/// +/// This structure is created by the [`read`] and [`try_read`] methods on +/// [`RwLock`]. +/// +/// [`read`]: RwLock::read +/// [`try_read`]: RwLock::try_read +#[must_use] +pub struct RwLockReadGuard<'lock, T: ?Sized> { + ptr: ConstPtr, + state: &'lock AtomicUsize, +} + +/// An RAII implementation of a "scoped write lock" of a [`RwLock`]. When this +/// structure is dropped (falls out of scope), the lock will be unlocked. +/// +/// The data protected by the [`RwLock`] can be mutably accessed through this +/// guard via its [`Deref`] and [`DerefMut`] implementations. +/// +/// This structure is created by the [`write`] and [`try_write`] methods on +/// [`RwLock`]. +/// +/// [`write`]: RwLock::write +/// [`try_write`]: RwLock::try_write +#[must_use = "if unused the RwLock will immediately unlock"] +pub struct RwLockWriteGuard<'lock, T: ?Sized> { + ptr: MutPtr, + state: &'lock AtomicUsize, +} + +const UNLOCKED: usize = 0; +const WRITER: usize = 1 << 0; +const READER: usize = 1 << 1; + +impl RwLock { + loom_const_fn! { + /// Creates a new, unlocked `RwLock` protecting the provided `data`. + /// + /// # Examples + /// + /// ``` + /// use maitake_sync::spin::RwLock; + /// + /// let lock = RwLock::new(5); + /// # drop(lock); + /// ``` + #[must_use] + pub fn new(data: T) -> Self { + Self { + state: AtomicUsize::new(0), + data: UnsafeCell::new(data), + } + } + } +} + +impl RwLock { + /// Locks this `RwLock` for shared read access, spinning until it can be + /// acquired. + /// + /// The calling CPU core will spin (with an exponential backoff) until there + /// are no more writers which hold the lock. There may be other readers + /// currently inside the lock when this method returns. This method does not + /// provide any guarantees with respect to the ordering of whether + /// contentious readers or writers will acquire the lock first. + /// + /// Returns an RAII guard which will release this thread's shared access + /// once it is dropped. + pub fn read(&self) -> RwLockReadGuard<'_, T> { + let mut backoff = Backoff::new(); + loop { + if let Some(guard) = self.try_read() { + return guard; + } + backoff.spin(); + } + } + + /// Attempts to acquire this `RwLock` for shared read access. + /// + /// If the access could not be granted at this time, this method returns + /// [`None`]. Otherwise, [`Some`]`(`[`RwLockReadGuard`]`)` containing a RAII + /// guard is returned. The shared access is released when it is dropped. + /// + /// This function does not spin. + #[cfg_attr(test, track_caller)] + pub fn try_read(&self) -> Option> { + // Add a reader. + let state = test_dbg!(self.state.fetch_add(READER, Acquire)); + + // Ensure we don't overflow the reader count and clobber the lock's + // state. + assert!( + state < usize::MAX - (READER * 2), + "read lock counter overflow! this is very bad" + ); + + // Is the write lock held? If so, undo the increment and bail. + if state & WRITER == 1 { + test_dbg!(self.state.fetch_sub(READER, Release)); + return None; + } + + Some(RwLockReadGuard { + ptr: self.data.get(), + state: &self.state, + }) + } + + /// Locks this `RwLock` for exclusive write access, spinning until write + /// access can be acquired. + /// + /// This function will not return while other writers or other readers + /// currently have access to the lock. + /// + /// Returns an RAII guard which will drop the write access of this `RwLock` + /// when dropped. + pub fn write(&self) -> RwLockWriteGuard<'_, T> { + let mut backoff = Backoff::new(); + + // Wait for the lock to become available and set the `WRITER` bit. + // + // Note that, unlike the `read` method, we don't use the `try_write` + // method here, as we would like to use `compare_exchange_weak` to allow + // spurious failures for improved performance. The `try_write` method + // cannot use `compare_exchange_weak`, because it will never retry, and + // a spurious failure means we would incorrectly fail to lock the RwLock + // when we should have successfully locked it. + while test_dbg!(self + .state + .compare_exchange_weak(UNLOCKED, WRITER, Acquire, Relaxed)) + .is_err() + { + test_dbg!(backoff.spin()); + } + + RwLockWriteGuard { + ptr: self.data.get_mut(), + state: &self.state, + } + } + + /// Attempts to acquire this `RwLock` for exclusive write access. + /// + /// If the access could not be granted at this time, this method returns + /// [`None`]. Otherwise, [`Some`]`(`[`RwLockWriteGuard`]`)` containing a + /// RAII guard is returned. The write access is released when it is dropped. + /// + /// This function does not spin. + pub fn try_write(&self) -> Option> { + if test_dbg!(self + .state + .compare_exchange(UNLOCKED, WRITER, Acquire, Relaxed)) + .is_ok() + { + return Some(RwLockWriteGuard { + ptr: self.data.get_mut(), + state: &self.state, + }); + } + + None + } +} + +impl fmt::Debug for RwLock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut s = f.debug_struct("RwLock"); + s.field("state", &self.state.load(Relaxed)); + match self.try_read() { + Some(read) => s.field("data", &read), + None => s.field("data", &format_args!("")), + }; + s.finish() + } +} + +impl Default for RwLock { + /// Creates a new `RwLock`, with the `Default` value for T. + fn default() -> RwLock { + RwLock::new(Default::default()) + } +} + +impl From for RwLock { + /// Creates a new instance of an `RwLock` which is unlocked. + /// This is equivalent to [`RwLock::new`]. + fn from(t: T) -> Self { + RwLock::new(t) + } +} + +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} + +// === impl RwLockReadGuard === + +impl Deref for RwLockReadGuard<'_, T> { + type Target = T; + #[inline] + fn deref(&self) -> &Self::Target { + unsafe { + // Safety: we are holding a read lock, so it is okay to dereference + // the const pointer immutably. + self.ptr.deref() + } + } +} + +impl AsRef for RwLockReadGuard<'_, T> +where + T: AsRef, +{ + #[inline] + fn as_ref(&self) -> &R { + self.deref().as_ref() + } +} + +impl Drop for RwLockReadGuard<'_, T> { + fn drop(&mut self) { + let _val = test_dbg!(self.state.fetch_sub(READER, Release)); + debug_assert_eq!( + _val & WRITER, + 0, + "tried to drop a read guard while write locked, something is Very Wrong!" + ) + } +} + +impl fmt::Debug for RwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(f) + } +} + +impl fmt::Display for RwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(f) + } +} + +/// A [`RwLockReadGuard`] only allows immutable (`&T`) access to a `T`. +/// Therefore, it is [`Send`] and [`Sync`] as long as `T` is [`Sync`], because +/// it can be used to *share* references to a `T` across multiple threads +/// (requiring `T: Sync`), but it *cannot* be used to move ownership of a `T` +/// across thread boundaries, as the `T` cannot be taken out of the lock through +/// a `RwLockReadGuard`. +unsafe impl Send for RwLockReadGuard<'_, T> {} +unsafe impl Sync for RwLockReadGuard<'_, T> {} + +// === impl RwLockWriteGuard === + +impl Deref for RwLockWriteGuard<'_, T> { + type Target = T; + #[inline] + fn deref(&self) -> &Self::Target { + unsafe { + // Safety: we are holding the lock, so it is okay to dereference the + // mut pointer. + &*self.ptr.deref() + } + } +} + +impl DerefMut for RwLockWriteGuard<'_, T> { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { + // Safety: we are holding the lock, so it is okay to dereference the + // mut pointer. + self.ptr.deref() + } + } +} + +impl AsRef for RwLockWriteGuard<'_, T> +where + T: AsRef, +{ + #[inline] + fn as_ref(&self) -> &R { + self.deref().as_ref() + } +} + +impl Drop for RwLockWriteGuard<'_, T> { + fn drop(&mut self) { + let _val = test_dbg!(self.state.swap(UNLOCKED, Release)); + } +} + +impl fmt::Debug for RwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(f) + } +} + +impl fmt::Display for RwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(f) + } +} + +/// A [`RwLockWriteGuard`] is only [`Send`] if `T` is [`Send`] and [`Sync`], +/// because it can be used to *move* a `T` across thread boundaries, as it +/// allows mutable access to the `T` that can be used with +/// [`core::mem::replace`] or [`core::mem::swap`]. +unsafe impl Send for RwLockWriteGuard<'_, T> {} + +/// A [`RwLockWriteGuard`] is only [`Sync`] if `T` is [`Send`] and [`Sync`], +/// because it can be used to *move* a `T` across thread boundaries, as it +/// allows mutable access to the `T` that can be used with +/// [`core::mem::replace`] or [`core::mem::swap`]. +unsafe impl Sync for RwLockWriteGuard<'_, T> {} + +#[cfg(test)] +mod tests { + use super::*; + use crate::loom::{self, sync::Arc, thread}; + + #[test] + fn write() { + const WRITERS: usize = 2; + + loom::model(|| { + let lock = Arc::new(RwLock::::new(0)); + let threads = (0..WRITERS) + .map(|_| { + let lock = lock.clone(); + thread::spawn(writer(lock)) + }) + .collect::>(); + + for thread in threads { + thread.join().expect("writer thread mustn't panic"); + } + + let guard = lock.read(); + assert_eq!(*guard, WRITERS, "final state must equal number of writers"); + }); + } + + #[test] + fn read_write() { + // this hits loom's preemption bound with 2 writer threads. + const WRITERS: usize = if cfg!(loom) { 1 } else { 2 }; + + loom::model(|| { + let lock = Arc::new(RwLock::::new(0)); + let w_threads = (0..WRITERS) + .map(|_| { + let lock = lock.clone(); + thread::spawn(writer(lock)) + }) + .collect::>(); + + { + let guard = lock.read(); + assert!(*guard == 0 || *guard == 1 || *guard == 2); + } + + for thread in w_threads { + thread.join().expect("writer thread mustn't panic") + } + + let guard = lock.read(); + assert_eq!(*guard, WRITERS, "final state must equal number of writers"); + }); + } + + fn writer(lock: Arc>) -> impl FnOnce() { + move || { + test_debug!("trying to acquire write lock..."); + let mut guard = lock.write(); + test_debug!("got write lock!"); + *guard += 1; + } + } +} diff --git a/maitake-sync/src/util.rs b/maitake-sync/src/util.rs index 031217ab..8b442759 100644 --- a/maitake-sync/src/util.rs +++ b/maitake-sync/src/util.rs @@ -9,14 +9,6 @@ //! //! - [`Backoff`]: exponential backoff for spin loops //! - [`CachePadded`]: pads and aligns a value to the size of a cache line -mod backoff; -mod cache_pad; -pub(crate) mod fmt; -mod maybe_uninit; -mod wake_batch; - -pub use self::{backoff::Backoff, cache_pad::CachePadded}; -pub(crate) use self::{maybe_uninit::CheckedMaybeUninit, wake_batch::WakeBatch}; #[cfg(any(test, feature = "tracing"))] macro_rules! trace { @@ -196,8 +188,16 @@ macro_rules! unreachable_unchecked { }); } +mod backoff; +mod cache_pad; +pub(crate) mod fmt; +mod maybe_uninit; +mod wake_batch; + #[cfg(all(test, not(loom)))] pub(crate) use self::test::trace_init; +pub use self::{backoff::Backoff, cache_pad::CachePadded}; +pub(crate) use self::{maybe_uninit::CheckedMaybeUninit, wake_batch::WakeBatch}; #[cfg(test)] pub(crate) mod test { diff --git a/maitake-sync/src/util/backoff.rs b/maitake-sync/src/util/backoff.rs index 2c49976f..4a512d92 100644 --- a/maitake-sync/src/util/backoff.rs +++ b/maitake-sync/src/util/backoff.rs @@ -1,5 +1,3 @@ -use crate::loom::hint; - /// An [exponential backoff] for spin loops. /// /// This is a helper struct for spinning in a busy loop, with an exponentially @@ -53,8 +51,16 @@ impl Backoff { #[inline(always)] pub fn spin(&mut self) { // Issue 2^exp pause instructions. - for _ in 0..1 << self.exp { - hint::spin_loop(); + let spins = 1 << self.exp; + #[cfg(not(loom))] + for _ in 0..spins { + crate::loom::hint::spin_loop(); + } + + #[cfg(loom)] + { + test_debug!("would back off for {spins} spins"); + loom::thread::yield_now(); } if self.exp < self.max { diff --git a/maitake-sync/src/wait_cell.rs b/maitake-sync/src/wait_cell.rs index 5b7f5cf1..45ef4d17 100644 --- a/maitake-sync/src/wait_cell.rs +++ b/maitake-sync/src/wait_cell.rs @@ -690,7 +690,6 @@ mod tests { mod loom { use super::*; use crate::loom::{future, sync::Arc, thread}; - use tokio_test::assert_pending; #[test] fn basic() { diff --git a/maitake/src/loom.rs b/maitake/src/loom.rs index 0aa79065..73ca374c 100644 --- a/maitake/src/loom.rs +++ b/maitake/src/loom.rs @@ -117,16 +117,34 @@ mod inner { #[cfg(test)] pub(crate) mod thread { + pub(crate) use std::thread::{yield_now, JoinHandle}; + pub(crate) fn spawn(f: F) -> JoinHandle where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { + use super::atomic::{AtomicUsize, Ordering::Relaxed}; + thread_local! { + static CHILDREN: AtomicUsize = const { AtomicUsize::new(1) }; + } + let track = super::alloc::track::Registry::current(); + let subscriber = tracing_02::Dispatch::default(); + let span = tracing_02::Span::current(); + let num = CHILDREN.with(|children| children.fetch_add(1, Relaxed)); std::thread::spawn(move || { + let _tracing = tracing_02::dispatch::set_default(&subscriber); + let _span = + tracing_02::info_span!(parent: span.id(), "thread", message = num).entered(); + + tracing_02::info!(num, "spawned child thread"); let _tracking = track.map(|track| track.set_default()); - f() + let res = f(); + tracing_02::info!(num, "child thread completed"); + + res }) } } @@ -153,9 +171,19 @@ mod inner { } pub(crate) fn check(&self, f: impl FnOnce()) { + let _trace = crate::util::test::trace_init(); + let _span = tracing_02::info_span!( + "test", + message = std::thread::current().name().unwrap_or("") + ) + .entered(); let registry = super::alloc::track::Registry::default(); let _tracking = registry.set_default(); + + tracing_02::info!("started test..."); f(); + tracing_02::info!("test completed successfully!"); + registry.check(); } } @@ -163,7 +191,6 @@ mod inner { #[cfg(test)] pub(crate) fn model(f: impl FnOnce()) { - let _trace = crate::util::test::trace_init(); model::Builder::new().check(f) }