diff --git a/src/lib.rs b/src/lib.rs index 5159140..21e8713 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ -use core::{fmt, marker::PhantomData, mem::MaybeUninit}; +#![cfg_attr(not(feature = "std"), no_std)] + +use core::{fmt, mem::MaybeUninit, ops::Index}; #[cfg(feature = "alloc")] extern crate alloc; @@ -6,12 +8,11 @@ extern crate alloc; macro_rules! test_println { ($($arg:tt)*) => { if cfg!(test) { - if std::thread::panicking() { + if crate::util::panicking() { // getting the thread ID while panicking doesn't seem to play super nicely with loom's // mock lazy_static... println!("[PANIC {:>17}:{:<3}] {}", file!(), line!(), format_args!($($arg)*)) } else { - #[cfg(test)] println!("[{:?} {:>17}:{:<3}] {}", crate::loom::thread::current().id(), file!(), line!(), format_args!($($arg)*)) } } @@ -31,37 +32,37 @@ macro_rules! test_dbg { } mod loom; -#[cfg(test)] -mod tests; mod util; -use crate::loom::{ - atomic::{AtomicUsize, Ordering}, - UnsafeCell, -}; - -use crate::util::{Backoff, CachePadded}; - +#[cfg(feature = "alloc")] +mod thingbuf; +#[cfg(feature = "alloc")] +pub use self::thingbuf::ThingBuf; #[cfg(feature = "alloc")] mod stringbuf; #[cfg(feature = "alloc")] -pub use stringbuf::StringBuf; - -/// A ringbuf of...things. -/// -/// # Examples -/// -/// Using a -pub struct ThingBuf]>> { +pub use stringbuf::{StaticStringBuf, StringBuf}; + +mod static_thingbuf; +pub use self::static_thingbuf::StaticThingBuf; + +use crate::{ + loom::{ + atomic::{AtomicUsize, Ordering}, + UnsafeCell, + }, + util::{Backoff, CachePadded}, +}; + +#[derive(Debug)] +struct Core { head: CachePadded, tail: CachePadded, gen: usize, gen_mask: usize, idx_mask: usize, capacity: usize, - slots: S, - _t: PhantomData, } pub struct Ref<'slot, T> { @@ -73,27 +74,28 @@ pub struct Ref<'slot, T> { pub struct AtCapacity(usize); pub struct Slot { - value: UnsafeCell, + value: UnsafeCell>, state: AtomicUsize, } -// === impl ThingBuf === - -impl ThingBuf { - pub fn new(capacity: usize) -> Self { - Self::new_with(capacity, T::default) +impl Core { + #[cfg(not(test))] + const fn new(capacity: usize) -> Self { + let gen = (capacity + 1).next_power_of_two(); + let idx_mask = gen - 1; + let gen_mask = !(gen - 1); + Self { + head: CachePadded(AtomicUsize::new(0)), + tail: CachePadded(AtomicUsize::new(0)), + gen, + gen_mask, + idx_mask, + capacity, + } } -} -impl ThingBuf { - pub fn new_with(capacity: usize, mut initializer: impl FnMut() -> T) -> Self { - assert!(capacity > 0); - let slots = (0..capacity) - .map(|idx| Slot { - state: AtomicUsize::new(idx), - value: UnsafeCell::new(initializer()), - }) - .collect(); + #[cfg(test)] + fn new(capacity: usize) -> Self { let gen = (capacity + 1).next_power_of_two(); let idx_mask = gen - 1; let gen_mask = !(gen - 1); @@ -104,13 +106,9 @@ impl ThingBuf { gen_mask, idx_mask, capacity, - slots, - _t: PhantomData, } } -} -impl ThingBuf { #[inline] fn idx_gen(&self, val: usize) -> (usize, usize) { (val & self.idx_mask, val & self.gen_mask) @@ -130,46 +128,19 @@ impl ThingBuf { } } - pub fn capacity(&self) -> usize { - self.capacity - } -} - -impl ThingBuf -where - S: AsRef<[Slot]>, -{ - pub fn from_array(slots: S) -> Self { - let capacity = slots.as_ref().len(); - assert!(capacity > 0); - for (idx, slot) in slots.as_ref().iter().enumerate() { - // Relaxed is fine here, because the slot is not shared yet. - slot.state.store(idx, Ordering::Relaxed); - } - let gen = (capacity + 1).next_power_of_two(); - let idx_mask = gen - 1; - let gen_mask = !(gen - 1); - Self { - head: CachePadded(AtomicUsize::new(0)), - tail: CachePadded(AtomicUsize::new(0)), - gen, - gen_mask, - idx_mask, - capacity, - slots, - _t: PhantomData, - } - } - #[inline] - pub fn push_with(&self, f: impl FnOnce(&mut T) -> U) -> Result { - self.push_ref().map(|mut r| r.with_mut(f)) + fn capacity(&self) -> usize { + self.capacity } - pub fn push_ref(&self) -> Result, AtCapacity> { + fn push_ref<'slots, T, S>(&self, slots: &'slots S) -> Result, AtCapacity> + where + T: Default, + S: Index> + ?Sized, + { + test_println!("push_ref"); let mut backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); - let slots = self.slots.as_ref(); loop { let (idx, gen) = self.idx_gen(tail); @@ -178,7 +149,7 @@ where let slot = &slots[idx]; let state = slot.state.load(Ordering::Acquire); - if state == tail { + if state == tail || (state == 0 && gen == 0) { // Move the tail index forward by 1. let next_tail = self.next(idx, gen); match self.tail.compare_exchange_weak( @@ -188,12 +159,25 @@ where Ordering::Relaxed, ) { Ok(_) => { + // We got the slot! It's now okay to write to it + test_println!("claimed tail slot"); + if gen == 0 { + slot.value.with_mut(|value| unsafe { + // Safety: we have just claimed exclusive ownership over + // this slot. + (*value).write(T::default()); + }); + test_println!("-> initialized"); + } + return Ok(Ref { new_state: tail + 1, slot, - }) + }); } Err(actual) => { + // Someone else took this slot and advanced the tail + // index. Try to claim the new tail. tail = actual; backoff.spin(); continue; @@ -215,15 +199,13 @@ where } } - #[inline] - pub fn pop_with(&self, f: impl FnOnce(&mut T) -> U) -> Option { - self.pop_ref().map(|mut r| r.with_mut(f)) - } - - pub fn pop_ref(&self) -> Option> { + fn pop_ref<'slots, T, S>(&self, slots: &'slots S) -> Option> + where + S: Index> + ?Sized, + { + test_println!("pop_ref"); let mut backoff = Backoff::new(); let mut head = self.head.load(Ordering::Relaxed); - let slots = self.slots.as_ref(); loop { test_dbg!(head); @@ -245,10 +227,11 @@ where Ordering::Relaxed, ) { Ok(_) => { + test_println!("claimed head slot"); return Some(Ref { new_state: head.wrapping_add(self.gen), slot, - }) + }); } Err(actual) => { head = actual; @@ -273,13 +256,24 @@ where head = self.head.load(Ordering::Relaxed); } } -} -impl fmt::Debug for ThingBuf { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ThingBuf") - .field("capacity", &self.capacity()) - .finish() + fn len(&self) -> usize { + use std::cmp; + loop { + let tail = self.tail.load(Ordering::SeqCst); + let head = self.head.load(Ordering::SeqCst); + + if self.tail.load(Ordering::SeqCst) == tail { + let (head_idx, _) = self.idx_gen(head); + let (tail_idx, _) = self.idx_gen(tail); + return match head_idx.cmp(&tail_idx) { + cmp::Ordering::Less => head_idx - tail_idx, + cmp::Ordering::Greater => self.capacity - head_idx + tail_idx, + _ if tail == head => 0, + _ => self.capacity, + }; + } + } } } @@ -289,16 +283,23 @@ impl Ref<'_, T> { #[inline] pub fn with(&self, f: impl FnOnce(&T) -> U) -> U { self.slot.value.with(|value| unsafe { - // Safety: if a `Ref` exists, we have exclusive ownership of the slot. - f(&*value) + // Safety: if a `Ref` exists, we have exclusive ownership of the + // slot. A `Ref` is only created if the slot has already been + // initialized. + // TODO(eliza): use `MaybeUninit::assume_init_ref` here once it's + // supported by `tracing-appender`'s MSRV. + f(&*(&*value).as_ptr()) }) } #[inline] pub fn with_mut(&mut self, f: impl FnOnce(&mut T) -> U) -> U { self.slot.value.with_mut(|value| unsafe { - // Safety: if a `Ref` exists, we have exclusive ownership of the slot. - f(&mut *value) + // Safety: if a `Ref` exists, we have exclusive ownership of the + // slot. + // TODO(eliza): use `MaybeUninit::assume_init_mut` here once it's + // supported by `tracing-appender`'s MSRV. + f(&mut *(&mut *value).as_mut_ptr()) }) } } @@ -344,42 +345,22 @@ impl fmt::Write for Ref<'_, T> { // === impl Slot === -impl Default for Slot { - fn default() -> Self { - Self::new(T::default()) - } -} - impl Slot { - const UNINIT: usize = usize::MAX; - #[cfg(not(test))] - pub const fn new(t: T) -> Self { + const fn empty() -> Self { Self { - value: UnsafeCell::new(t), - state: AtomicUsize::new(Self::UNINIT), + value: UnsafeCell::new(MaybeUninit::uninit()), + state: AtomicUsize::new(0), } } #[cfg(test)] - pub fn new(t: T) -> Self { + fn empty() -> Self { Self { - value: UnsafeCell::new(t), - state: AtomicUsize::new(Self::UNINIT), + value: UnsafeCell::new(MaybeUninit::uninit()), + state: AtomicUsize::new(0), } } } -impl Slot> { - #[cfg(not(test))] - pub const fn uninit() -> Self { - Self::new(MaybeUninit::uninit()) - } - - #[cfg(test)] - pub fn uninit() -> Self { - Self::new(MaybeUninit::uninit()) - } -} - unsafe impl Sync for Slot {} diff --git a/src/loom.rs b/src/loom.rs index c1533ac..6c7aa88 100644 --- a/src/loom.rs +++ b/src/loom.rs @@ -6,7 +6,7 @@ mod inner { pub use loom::sync::atomic::*; pub use std::sync::atomic::Ordering; } - pub(crate) use loom::{cell::UnsafeCell, hint, sync, thread}; + pub(crate) use loom::{cell::UnsafeCell, hint, thread}; pub(crate) fn model(f: impl Fn() + Sync + Send + 'static) { let iteration = core::sync::atomic::AtomicUsize::new(0); diff --git a/src/static_thingbuf.rs b/src/static_thingbuf.rs new file mode 100644 index 0000000..46afc48 --- /dev/null +++ b/src/static_thingbuf.rs @@ -0,0 +1,82 @@ +use crate::loom::atomic::Ordering; +use crate::{AtCapacity, Core, Ref, Slot}; +use core::{fmt, ptr}; + +pub struct StaticThingBuf { + core: Core, + slots: [Slot; CAP], +} + +// === impl ThingBuf === + +#[cfg(not(test))] +impl StaticThingBuf { + const SLOT: Slot = Slot::empty(); + + pub const fn new() -> Self { + Self { + core: Core::new(CAP), + slots: [Self::SLOT; CAP], + } + } +} + +impl StaticThingBuf { + #[inline] + pub fn capacity(&self) -> usize { + CAP + } + + #[inline] + pub fn len(&self) -> usize { + self.core.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl StaticThingBuf { + pub fn push_ref(&self) -> Result, AtCapacity> { + self.core.push_ref(&self.slots) + } + + #[inline] + pub fn push_with(&self, f: impl FnOnce(&mut T) -> U) -> Result { + self.push_ref().map(|mut r| r.with_mut(f)) + } + + pub fn pop_ref(&self) -> Option> { + self.core.pop_ref(&self.slots) + } + + #[inline] + pub fn pop_with(&self, f: impl FnOnce(&mut T) -> U) -> Option { + self.pop_ref().map(|mut r| r.with_mut(f)) + } +} + +impl Drop for StaticThingBuf { + fn drop(&mut self) { + let tail = self.core.tail.load(Ordering::SeqCst); + let (idx, gen) = self.core.idx_gen(tail); + let num_initialized = if gen > 0 { self.capacity() } else { idx }; + for slot in &self.slots[..num_initialized] { + unsafe { + slot.value + .with_mut(|value| ptr::drop_in_place((*value).as_mut_ptr())); + } + } + } +} + +impl fmt::Debug for StaticThingBuf { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThingBuf") + .field("capacity", &self.capacity()) + .field("len", &self.len()) + .finish() + } +} diff --git a/src/stringbuf.rs b/src/stringbuf.rs index fa0b4ce..2b1c160 100644 --- a/src/stringbuf.rs +++ b/src/stringbuf.rs @@ -3,8 +3,8 @@ use super::*; use alloc::string::String; #[derive(Debug)] -pub struct StringBuf]>> { - inner: ThingBuf, +pub struct StringBuf { + inner: ThingBuf, max_idle_capacity: usize, } @@ -15,22 +15,56 @@ impl StringBuf { max_idle_capacity: usize::MAX, } } + + pub fn with_max_idle_capacity(self, max_idle_capacity: usize) -> Self { + Self { + max_idle_capacity, + inner: self.inner, + } + } + + #[inline] + pub fn capacity(&self) -> usize { + self.inner.capacity() + } + + #[inline] + pub fn write(&self) -> Result, AtCapacity> { + let mut string = self.inner.push_ref()?; + string.with_mut(String::clear); + Ok(string) + } + + pub fn pop_ref(&self) -> Option> { + let mut string = self.inner.pop_ref()?; + string.with_mut(|string| { + if string.capacity() > self.max_idle_capacity { + string.shrink_to_fit(); + } + }); + Some(string) + } +} + +#[derive(Debug)] +pub struct StaticStringBuf { + inner: StaticThingBuf, + max_idle_capacity: usize, } -impl StringBuf -where - S: AsRef<[Slot]>, -{ - pub fn from_array(array: S) -> Self { +impl StaticStringBuf { + #[cfg(not(test))] + pub const fn new() -> Self { Self { - inner: ThingBuf::from_array(array), + inner: StaticThingBuf::new(), max_idle_capacity: usize::MAX, } } + pub fn with_max_idle_capacity(self, max_idle_capacity: usize) -> Self { Self { max_idle_capacity, - ..self + inner: self.inner, } } diff --git a/src/thingbuf.rs b/src/thingbuf.rs new file mode 100644 index 0000000..5288a25 --- /dev/null +++ b/src/thingbuf.rs @@ -0,0 +1,81 @@ +use crate::loom::atomic::Ordering; +use crate::{AtCapacity, Core, Ref, Slot}; +use alloc::boxed::Box; +use core::{fmt, ptr}; + +#[cfg(test)] +mod tests; + +pub struct ThingBuf { + core: Core, + slots: Box<[Slot]>, +} + +// === impl ThingBuf === + +impl ThingBuf { + pub fn new(capacity: usize) -> Self { + assert!(capacity > 0); + let slots = (0..capacity).map(|_| Slot::empty()).collect(); + Self { + core: Core::new(capacity), + slots, + } + } + + pub fn push_ref(&self) -> Result, AtCapacity> { + self.core.push_ref(&*self.slots) + } + + #[inline] + pub fn push_with(&self, f: impl FnOnce(&mut T) -> U) -> Result { + self.push_ref().map(|mut r| r.with_mut(f)) + } + + pub fn pop_ref(&self) -> Option> { + self.core.pop_ref(&*self.slots) + } + + #[inline] + pub fn pop_with(&self, f: impl FnOnce(&mut T) -> U) -> Option { + self.pop_ref().map(|mut r| r.with_mut(f)) + } + + #[inline] + pub fn capacity(&self) -> usize { + self.slots.len() + } + + #[inline] + pub fn len(&self) -> usize { + self.core.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl Drop for ThingBuf { + fn drop(&mut self) { + let tail = self.core.tail.load(Ordering::SeqCst); + let (idx, gen) = self.core.idx_gen(tail); + let num_initialized = if gen > 0 { self.capacity() } else { idx }; + for slot in &self.slots[..num_initialized] { + unsafe { + slot.value + .with_mut(|value| ptr::drop_in_place((*value).as_mut_ptr())); + } + } + } +} + +impl fmt::Debug for ThingBuf { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThingBuf") + .field("capacity", &self.capacity()) + .field("len", &self.len()) + .finish() + } +} diff --git a/src/tests.rs b/src/thingbuf/tests.rs similarity index 84% rename from src/tests.rs rename to src/thingbuf/tests.rs index 63fa1f8..b75b669 100644 --- a/src/tests.rs +++ b/src/thingbuf/tests.rs @@ -1,5 +1,5 @@ use super::ThingBuf; -use crate::loom::{self, thread}; +use crate::loom::{self, alloc, thread}; use std::sync::Arc; #[test] @@ -7,12 +7,15 @@ fn push_many_mpsc() { const T1_VALS: &[&str] = &["alice", "bob", "charlie"]; const T2_VALS: &[&str] = &["dave", "ellen", "francis"]; - fn producer(vals: &'static [&'static str], q: &Arc>) -> impl FnOnce() { + fn producer( + vals: &'static [&'static str], + q: &Arc>>, + ) -> impl FnOnce() { let q = q.clone(); move || { for &val in vals { if let Ok(mut r) = test_dbg!(q.push_ref()) { - r.with_mut(|r| r.push_str(val)); + r.with_mut(|r| r.get_mut().push_str(val)); } else { return; } @@ -30,7 +33,7 @@ fn push_many_mpsc() { while Arc::strong_count(&q) > 1 { if let Some(r) = q.pop_ref() { - r.with(|val| all_vals.push(val.to_string())); + r.with(|val| all_vals.push(val.get_ref().to_string())); } thread::yield_now(); } @@ -39,7 +42,7 @@ fn push_many_mpsc() { t2.join().unwrap(); while let Some(r) = test_dbg!(q.pop_ref()) { - r.with(|val| all_vals.push(val.to_string())); + r.with(|val| all_vals.push(val.get_ref().to_string())); } test_dbg!(&all_vals); diff --git a/src/util.rs b/src/util.rs index a7f6557..1fdca44 100644 --- a/src/util.rs +++ b/src/util.rs @@ -12,6 +12,16 @@ pub(crate) struct Backoff(u8); #[derive(Clone, Copy, Default, Hash, PartialEq, Eq, Debug)] pub(crate) struct CachePadded(pub(crate) T); +#[cfg(feature = "std")] +pub(crate) fn panicking() -> bool { + std::thread::panicking() +} + +#[cfg(not(feature = "std"))] +pub(crate) fn panicking() -> bool { + false +} + // === impl Backoff === impl Backoff { diff --git a/tests/static_storage.rs b/tests/static_storage.rs index 11817ad..ae461c5 100644 --- a/tests/static_storage.rs +++ b/tests/static_storage.rs @@ -1,40 +1,34 @@ -use std::{fmt::Write, sync::Arc, thread}; -use thingbuf::{Slot, StringBuf, ThingBuf}; +use std::{ + fmt::Write, + sync::atomic::{AtomicBool, Ordering}, + thread, +}; +use thingbuf::{StaticStringBuf, StaticThingBuf}; #[test] fn static_storage_thingbuf() { - let thingbuf = Arc::new(ThingBuf::from_array([ - Slot::new(0), - Slot::new(0), - Slot::new(0), - Slot::new(0), - Slot::new(0), - Slot::new(0), - Slot::new(0), - Slot::new(0), - ])); - - let producer = { - let thingbuf = thingbuf.clone(); - thread::spawn(move || { - for i in 0..32 { - let mut thing = 'write: loop { - match thingbuf.push_ref() { - Ok(thing) => break 'write thing, - _ => thread::yield_now(), - } - }; - thing.with_mut(|thing| *thing = i); - } - }) - }; + static BUF: StaticThingBuf = StaticThingBuf::new(); + static PRODUCER_LIVE: AtomicBool = AtomicBool::new(true); + + let producer = thread::spawn(move || { + for i in 0..32 { + let mut thing = 'write: loop { + match BUF.push_ref() { + Ok(thing) => break 'write thing, + _ => thread::yield_now(), + } + }; + thing.with_mut(|thing| *thing = i); + } + PRODUCER_LIVE.store(false, Ordering::Release); + }); let mut i = 0; // While the producer is writing to the queue, push each entry to the // results string. - while Arc::strong_count(&thingbuf) > 1 { - match thingbuf.pop_ref() { + while PRODUCER_LIVE.load(Ordering::Acquire) { + match BUF.pop_ref() { Some(thing) => thing.with(|thing| { assert_eq!(*thing, i); i += 1; @@ -46,7 +40,7 @@ fn static_storage_thingbuf() { producer.join().unwrap(); // drain the queue. - while let Some(thing) = thingbuf.pop_ref() { + while let Some(thing) = BUF.pop_ref() { thing.with(|thing| { assert_eq!(*thing, i); i += 1; @@ -56,52 +50,45 @@ fn static_storage_thingbuf() { #[test] fn static_storage_stringbuf() { - let stringbuf = Arc::new(StringBuf::from_array([ - Slot::new(String::new()), - Slot::new(String::new()), - Slot::new(String::new()), - Slot::new(String::new()), - Slot::new(String::new()), - Slot::new(String::new()), - Slot::new(String::new()), - Slot::new(String::new()), - ])); - - let producer = { - let stringbuf = stringbuf.clone(); - thread::spawn(move || { - for i in 0..16 { - let mut string = 'write: loop { - match stringbuf.write() { - Ok(string) => break 'write string, - _ => thread::yield_now(), - } - }; - - write!(&mut string, "{:?}", i).unwrap(); - } - }) - }; + static BUF: StaticStringBuf<8> = StaticStringBuf::new(); + static PRODUCER_LIVE: AtomicBool = AtomicBool::new(true); + + let producer = thread::spawn(move || { + for i in 0..16 { + let mut string = 'write: loop { + match BUF.write() { + Ok(string) => break 'write string, + _ => thread::yield_now(), + } + }; + + write!(&mut string, "{:?}", i).unwrap(); + } + + PRODUCER_LIVE.store(false, Ordering::Release); + println!("producer done"); + }); let mut results = String::new(); // While the producer is writing to the queue, push each entry to the // results string. - while Arc::strong_count(&stringbuf) > 1 { - if let Some(string) = stringbuf.pop_ref() { + while PRODUCER_LIVE.load(Ordering::Acquire) { + if let Some(string) = BUF.pop_ref() { writeln!(results, "{}", string).unwrap(); } thread::yield_now(); } producer.join().unwrap(); + println!("producer done..."); // drain the queue. - while let Some(string) = stringbuf.pop_ref() { + while let Some(string) = BUF.pop_ref() { writeln!(results, "{}", string).unwrap(); } - let results = dbg!(results); + println!("results:\n{}", results); for (n, ln) in results.lines().enumerate() { assert_eq!(ln.parse::(), Ok(n));