From 54e53534303606d734cb7490cf782d7b71fb8103 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 28 Feb 2022 14:07:19 -0800 Subject: [PATCH] feat(recycling): add customizable recycling policies (#33) ## Motivation Currently, all queues and channels in `thingbuf` require that items in the queue/channel implement `Default`, because `Default` is used to fill slots when they are initially allocated. Furthermore, when slots are checked out for writing to, they are not cleared prior to being written to --- the user code is responsible for clearing them if needed. The `StringBuf` type currently implements special behavior _specifically_ for `String`s, where the `String` is cleared in place prior to writing to, but this only works for `String`s. `StringBuf` also provides an API for limiting the maximum capacity of "empty" strings, so that they can be shrunk down to that capacity when returning them to the pool. This allows introducing an upper bound on the capacity allocated by unused strings. However, again, this only works with `String`s and is only provided by the `StringBuf` type. This isn't ideal --- users shouldn't _have_ to be responsible for clearing non-`String` types when reusing allocations. ## Solution This branch introduces a new `Recycle` trait that defines a policy for how `T`-typed pooled objects should be reused. `Recycle` defines two methods: * `fn new_element(&self) -> T` creates a new element * `fn recycle(&self, element: &mut T)` clears a pooled element for reuse This allows a `Recycle` implementation to define the lifecycle of a pooled item. In addition, we define a couple of pre-made `Recycle` implementations: * `DefaultRecycle`, which implements `Recycle` for all types `T` where `T: Default + Clone`. This is used by all `thingbuf` types by default. It creates new elements using `Default::default`, and recycles them using `element.clone_from(T::default())`. `Clone::clone_from` is not _guaranteed_ to re-use existing capacity, but it's overridden by most array-based collections (such as the ones in the standard library) to do so --- it should be equivalent to `.clear()` when cloning from an empty collection. However, this policy will still *work* with types that don't have a clear-in-place function. * `WithCapacity` implements `Recycle` only for types that define `with_capacity`, `shrink_to`, and `clear` methods, like all array-based collections in the Rust standard library. Unlike `DefaultRecycle`, it is _guaranteed_ to clear elements in place and retain any previously allocated capacity. It can also be configured to add both upper and lower bounds on capacity. When there is a lower bound, new elements are allocated with that value as their initial capacity, rather than being allocated with 0 capacity. When an upper bound is set, it will call `shrink_to` prior to clearing elements, to limit the total allocated capacity retained by the pool. `WithCapacity` currently implements `Recycle` for all `alloc` and `std` types that define the requisite methods: `Vec`, `String`, `VecDeque`, and `BinaryHeap` when the `alloc` feature is enabled, and `HashMap` and `HashSet` as well, when the `std` feature is enabled. Finally, I've modified the existing queue and channel types to allow configuring them to use a `Recycle` implementation. The `StringBuf` type is removed, as it's now obviated by the new APIs. ## Future Work We may wish to factor out the `recycling` module into its own crate, so that it can be used in other libraries. Closes #30 Signed-off-by: Eliza Weisman --- src/lib.rs | 48 +++-- src/loom.rs | 10 + src/mpsc.rs | 15 +- src/mpsc/async_impl.rs | 147 +++++++++---- src/mpsc/sync.rs | 145 ++++++++----- src/recycle.rs | 25 --- src/recycling.rs | 463 ++++++++++++++++++++++++++++++++++++++++ src/static_thingbuf.rs | 45 ++-- src/stringbuf.rs | 92 -------- src/thingbuf.rs | 235 ++++++++++---------- tests/static_storage.rs | 9 +- 11 files changed, 867 insertions(+), 367 deletions(-) delete mode 100644 src/recycle.rs create mode 100644 src/recycling.rs delete mode 100644 src/stringbuf.rs diff --git a/src/lib.rs b/src/lib.rs index b2da4ea..e5dd1bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,36 +2,36 @@ #![cfg_attr(not(feature = "std"), no_std)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] use core::{cmp, fmt, mem::MaybeUninit, ops, ptr}; - #[macro_use] mod macros; mod loom; -mod recycle; +pub mod mpsc; +pub mod recycling; mod util; mod wait; +pub use self::recycling::Recycle; + #[cfg_attr(docsrs, doc = include_str!("../mpsc_perf_comparison.md"))] pub mod mpsc_perf_comparison { // Empty module, used only for documentation. } +feature! { + #![not(all(loom, test))] + mod static_thingbuf; + pub use self::static_thingbuf::StaticThingBuf; +} + feature! { #![feature = "alloc"] extern crate alloc; mod thingbuf; pub use self::thingbuf::ThingBuf; - - mod stringbuf; - pub use stringbuf::{StaticStringBuf, StringBuf}; } -pub mod mpsc; - -mod static_thingbuf; -pub use self::static_thingbuf::StaticThingBuf; - use crate::{ loom::{ atomic::{AtomicUsize, Ordering::*}, @@ -97,7 +97,6 @@ impl Core { closed, idx_mask, capacity, - has_dropped_slots: false, } } @@ -116,7 +115,6 @@ impl Core { gen_mask, idx_mask, capacity, - #[cfg(debug_assertions)] has_dropped_slots: false, } @@ -155,12 +153,13 @@ impl Core { } #[inline(always)] - fn push_ref<'slots, T, S>( + fn push_ref<'slots, T, S, R>( &self, slots: &'slots S, + recycle: &R, ) -> Result, mpsc::TrySendError<()>> where - T: Default, + R: Recycle, S: ops::Index> + ?Sized, { test_println!("push_ref"); @@ -190,13 +189,20 @@ impl Core { // Claim exclusive ownership over the slot let ptr = slot.value.get_mut(); - if gen == 0 { - unsafe { - // Safety: we have just claimed exclusive ownership over - // this slot. - ptr.deref().write(T::default()); - }; - test_println!("-> initialized"); + // Initialize or recycle the element. + unsafe { + // Safety: we have just claimed exclusive ownership over + // this slot. + let ptr = ptr.deref(); + if gen == 0 { + ptr.write(recycle.new_element()); + test_println!("-> initialized"); + } else { + // Safety: if the generation is > 0, then the + // slot has already been initialized. + recycle.recycle(ptr.assume_init_mut()); + test_println!("-> recycled"); + } } return Ok(Ref { diff --git a/src/loom.rs b/src/loom.rs index 34b1006..7b67828 100644 --- a/src/loom.rs +++ b/src/loom.rs @@ -187,6 +187,16 @@ mod inner { Self::new(T::default()) } } + + impl Clone for Track { + fn clone(&self) -> Self { + Self::new(self.get_ref().clone()) + } + + fn clone_from(&mut self, source: &Self) { + self.get_mut().clone_from(source.get_ref()); + } + } } } diff --git a/src/mpsc.rs b/src/mpsc.rs index c2067a3..8090e3d 100644 --- a/src/mpsc.rs +++ b/src/mpsc.rs @@ -12,6 +12,7 @@ use crate::{ loom::{atomic::AtomicUsize, hint}, + recycling::Recycle, wait::{Notify, WaitCell, WaitQueue, WaitResult}, Core, Ref, Slot, }; @@ -109,24 +110,25 @@ impl ChannelCore where N: Notify + Unpin, { - fn try_send_ref<'a, T>( + fn try_send_ref<'a, T, R>( &'a self, slots: &'a [Slot], + recycle: &R, ) -> Result, TrySendError> where - T: Default, + R: Recycle, { - self.core.push_ref(slots).map(|slot| SendRefInner { + self.core.push_ref(slots, recycle).map(|slot| SendRefInner { _notify: NotifyRx(&self.rx_wait), slot, }) } - fn try_send(&self, slots: &[Slot], val: T) -> Result<(), TrySendError> + fn try_send(&self, slots: &[Slot], val: T, recycle: &R) -> Result<(), TrySendError> where - T: Default, + R: Recycle, { - match self.try_send_ref(slots) { + match self.try_send_ref(slots, recycle) { Ok(mut slot) => { slot.with_mut(|slot| *slot = val); Ok(()) @@ -147,7 +149,6 @@ where ) -> Poll>> where S: Index> + ?Sized, - T: Default, { macro_rules! try_poll_recv { () => { diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index f5ca764..dd337d8 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -1,6 +1,7 @@ use super::*; use crate::{ loom::atomic::{self, AtomicBool, Ordering}, + recycling::{self, Recycle}, wait::queue, Ref, }; @@ -16,12 +17,21 @@ feature! { use crate::loom::sync::Arc; - /// Returns a new synchronous multi-producer, single consumer channel. - pub fn channel(capacity: usize) -> (Sender, Receiver) { + /// Returns a new asynchronous multi-producer, single consumer channel. + pub fn channel(capacity: usize) -> (Sender, Receiver) { + with_recycle(capacity, recycling::DefaultRecycle::new()) + } + + /// Returns a new asynchronous multi-producer, single consumer channel with + /// the provided [recycling policy]. + /// + /// [recycling policy]: crate::recycling::Recycle + pub fn with_recycle>(capacity: usize, recycle: R) -> (Sender, Receiver) { assert!(capacity > 0); let inner = Arc::new(Inner { core: ChannelCore::new(capacity), slots: Slot::make_boxed_array(capacity), + recycle, }); let tx = Sender { inner: inner.clone(), @@ -32,18 +42,19 @@ feature! { #[derive(Debug)] - pub struct Receiver { - inner: Arc>, + pub struct Receiver { + inner: Arc>, } #[derive(Debug)] - pub struct Sender { - inner: Arc>, + pub struct Sender { + inner: Arc>, } - struct Inner { + struct Inner { core: super::ChannelCore, slots: Box<[Slot]>, + recycle: R, } } @@ -79,19 +90,22 @@ feature! { /// ``` /// [`split`]: StaticChannel::split #[cfg_attr(all(loom, test), allow(dead_code))] -pub struct StaticChannel { +pub struct StaticChannel { core: ChannelCore, + recycle: R, slots: [Slot; CAPACITY], is_split: AtomicBool, } -pub struct StaticSender { +pub struct StaticSender { core: &'static ChannelCore, + recycle: &'static R, slots: &'static [Slot], } -pub struct StaticReceiver { +pub struct StaticReceiver { core: &'static ChannelCore, + recycle: &'static R, slots: &'static [Slot], } @@ -122,15 +136,17 @@ pub struct RecvRefFuture<'a, T> { /// /// [`ThingBuf`]: crate::ThingBuf #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct RecvFuture<'a, T> { +pub struct RecvFuture<'a, T, R = recycling::DefaultRecycle> { core: &'a ChannelCore, slots: &'a [Slot], + recycle: &'a R, } #[pin_project::pin_project(PinnedDrop)] -struct SendRefFuture<'sender, T> { +struct SendRefFuture<'sender, T, R> { core: &'sender ChannelCore, slots: &'sender [Slot], + recycle: &'sender R, state: State, #[pin] waiter: queue::Waiter, @@ -183,9 +199,12 @@ impl StaticChannel { core: ChannelCore::new(CAPACITY), slots: Slot::make_static_array::(), is_split: AtomicBool::new(false), + recycle: recycling::DefaultRecycle::new(), } } +} +impl StaticChannel { /// Split a [`StaticChannel`] into a [`StaticSender`]/[`StaticReceiver`] /// pair. /// @@ -197,7 +216,7 @@ impl StaticChannel { /// # Panics /// /// If the channel has already been split. - pub fn split(&'static self) -> (StaticSender, StaticReceiver) { + pub fn split(&'static self) -> (StaticSender, StaticReceiver) { self.try_split().expect("channel already split") } @@ -207,16 +226,18 @@ impl StaticChannel { /// A static channel can only be split a single time. If /// [`StaticChannel::split`] or [`StaticChannel::try_split`] have been /// called previously, this method returns `None`. - pub fn try_split(&'static self) -> Option<(StaticSender, StaticReceiver)> { + pub fn try_split(&'static self) -> Option<(StaticSender, StaticReceiver)> { self.is_split .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .ok()?; let tx = StaticSender { core: &self.core, + recycle: &self.recycle, slots: &self.slots[..], }; let rx = StaticReceiver { core: &self.core, + recycle: &self.recycle, slots: &self.slots[..], }; Some((tx, rx)) @@ -226,22 +247,28 @@ impl StaticChannel { // === impl Sender === #[cfg(feature = "alloc")] -impl Sender { +impl Sender +where + R: Recycle, +{ pub fn try_send_ref(&self) -> Result, TrySendError> { self.inner .core - .try_send_ref(self.inner.slots.as_ref()) + .try_send_ref(self.inner.slots.as_ref(), &self.inner.recycle) .map(SendRef) } pub fn try_send(&self, val: T) -> Result<(), TrySendError> { - self.inner.core.try_send(self.inner.slots.as_ref(), val) + self.inner + .core + .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle) } pub async fn send_ref(&self) -> Result, Closed> { SendRefFuture { core: &self.inner.core, slots: self.inner.slots.as_ref(), + recycle: &self.inner.recycle, state: State::Start, waiter: queue::Waiter::new(), } @@ -260,7 +287,7 @@ impl Sender { } #[cfg(feature = "alloc")] -impl Clone for Sender { +impl Clone for Sender { fn clone(&self) -> Self { test_dbg!(self.inner.core.tx_count.fetch_add(1, Ordering::Relaxed)); Self { @@ -270,7 +297,7 @@ impl Clone for Sender { } #[cfg(feature = "alloc")] -impl Drop for Sender { +impl Drop for Sender { fn drop(&mut self) { if test_dbg!(self.inner.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 { return; @@ -286,7 +313,7 @@ impl Drop for Sender { // === impl Receiver === #[cfg(feature = "alloc")] -impl Receiver { +impl Receiver { pub fn recv_ref(&self) -> RecvRefFuture<'_, T> { RecvRefFuture { core: &self.inner.core, @@ -294,10 +321,14 @@ impl Receiver { } } - pub fn recv(&self) -> RecvFuture<'_, T> { + pub fn recv(&self) -> RecvFuture<'_, T, R> + where + R: Recycle, + { RecvFuture { core: &self.inner.core, slots: self.inner.slots.as_ref(), + recycle: &self.inner.recycle, } } @@ -331,9 +362,12 @@ impl Receiver { /// sender, or when the channel is closed. Note that on multiple calls to /// `poll_recv`, only the [`Waker`] from the [`Context`] passed to the most /// recent call is scheduled to receive a wakeup. - pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { + pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> + where + R: Recycle, + { self.poll_recv_ref(cx) - .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take))) + .map(|opt| opt.map(|mut r| recycling::take(&mut *r, &self.inner.recycle))) } pub fn is_closed(&self) -> bool { @@ -342,7 +376,7 @@ impl Receiver { } #[cfg(feature = "alloc")] -impl Drop for Receiver { +impl Drop for Receiver { fn drop(&mut self) { self.inner.core.close_rx(); } @@ -350,19 +384,25 @@ impl Drop for Receiver { // === impl StaticSender === -impl StaticSender { +impl StaticSender +where + R: Recycle, +{ pub fn try_send_ref(&self) -> Result, TrySendError> { - self.core.try_send_ref(self.slots).map(SendRef) + self.core + .try_send_ref(self.slots, self.recycle) + .map(SendRef) } pub fn try_send(&self, val: T) -> Result<(), TrySendError> { - self.core.try_send(self.slots, val) + self.core.try_send(self.slots, val, self.recycle) } pub async fn send_ref(&self) -> Result, Closed> { SendRefFuture { core: self.core, slots: self.slots, + recycle: self.recycle, state: State::Start, waiter: queue::Waiter::new(), } @@ -386,11 +426,12 @@ impl Clone for StaticSender { Self { core: self.core, slots: self.slots, + recycle: self.recycle, } } } -impl Drop for StaticSender { +impl Drop for StaticSender { fn drop(&mut self) { if test_dbg!(self.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 { return; @@ -403,18 +444,19 @@ impl Drop for StaticSender { } } -impl fmt::Debug for StaticSender { +impl fmt::Debug for StaticSender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StaticSender") .field("core", &self.core) .field("slots", &format_args!("&[..]")) + .field("recycle", self.recycle) .finish() } } // === impl StaticReceiver === -impl StaticReceiver { +impl StaticReceiver { pub fn recv_ref(&self) -> RecvRefFuture<'_, T> { RecvRefFuture { core: self.core, @@ -422,10 +464,14 @@ impl StaticReceiver { } } - pub fn recv(&self) -> RecvFuture<'_, T> { + pub fn recv(&self) -> RecvFuture<'_, T, R> + where + R: Recycle, + { RecvFuture { core: self.core, slots: self.slots, + recycle: self.recycle, } } @@ -459,9 +505,12 @@ impl StaticReceiver { /// sender, or when the channel is closed. Note that on multiple calls to /// `poll_recv`, only the [`Waker`] from the [`Context`] passed to the most /// recent call is scheduled to receive a wakeup. - pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { + pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> + where + R: Recycle, + { self.poll_recv_ref(cx) - .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take))) + .map(|opt| opt.map(|mut r| recycling::take(&mut *r, self.recycle))) } pub fn is_closed(&self) -> bool { @@ -469,17 +518,18 @@ impl StaticReceiver { } } -impl Drop for StaticReceiver { +impl Drop for StaticReceiver { fn drop(&mut self) { self.core.close_rx(); } } -impl fmt::Debug for StaticReceiver { +impl fmt::Debug for StaticReceiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StaticReceiver") .field("core", &self.core) .field("slots", &format_args!("&[..]")) + .field("recycle", &self.recycle) .finish() } } @@ -487,7 +537,7 @@ impl fmt::Debug for StaticReceiver { // === impl RecvRefFuture === #[inline] -fn poll_recv_ref<'a, T: Default>( +fn poll_recv_ref<'a, T>( core: &'a ChannelCore, slots: &'a [Slot], cx: &mut Context<'_>, @@ -501,7 +551,7 @@ fn poll_recv_ref<'a, T: Default>( }) } -impl<'a, T: Default> Future for RecvRefFuture<'a, T> { +impl<'a, T> Future for RecvRefFuture<'a, T> { type Output = Option>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -511,20 +561,24 @@ impl<'a, T: Default> Future for RecvRefFuture<'a, T> { // === impl Recv === -impl<'a, T: Default> Future for RecvFuture<'a, T> { +impl<'a, T, R> Future for RecvFuture<'a, T, R> +where + R: Recycle, +{ type Output = Option; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { poll_recv_ref(self.core, self.slots, cx) - .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take))) + .map(|opt| opt.map(|mut r| recycling::take(&mut *r, self.recycle))) } } // === impl SendRefFuture === -impl<'sender, T: Default + 'sender> Future for SendRefFuture<'sender, T> +impl<'sender, T, R> Future for SendRefFuture<'sender, T, R> where - T: Default + 'sender, + R: Recycle + 'sender, + T: 'sender, { type Output = Result, Closed>; @@ -536,7 +590,7 @@ where let node = this.waiter; match test_dbg!(*this.state) { State::Start => { - match this.core.try_send_ref(this.slots) { + match this.core.try_send_ref(this.slots, *this.recycle) { Ok(slot) => return Poll::Ready(Ok(SendRef(slot))), Err(TrySendError::Closed(_)) => return Poll::Ready(Err(Closed(()))), Err(_) => {} @@ -573,7 +627,7 @@ where } } } - State::Done => match this.core.try_send_ref(this.slots) { + State::Done => match this.core.try_send_ref(this.slots, *this.recycle) { Ok(slot) => return Poll::Ready(Ok(SendRef(slot))), Err(TrySendError::Closed(_)) => return Poll::Ready(Err(Closed(()))), Err(_) => { @@ -586,7 +640,7 @@ where } #[pin_project::pinned_drop] -impl PinnedDrop for SendRefFuture<'_, T> { +impl PinnedDrop for SendRefFuture<'_, T, R> { fn drop(self: Pin<&mut Self>) { test_println!("SendRefFuture::drop({:p})", self); let this = self.project(); @@ -598,16 +652,17 @@ impl PinnedDrop for SendRefFuture<'_, T> { feature! { #![feature = "alloc"] - impl fmt::Debug for Inner { + impl fmt::Debug for Inner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Inner") .field("core", &self.core) .field("slots", &format_args!("Box<[..]>")) + .field("recycle", &self.recycle) .finish() } } - impl Drop for Inner { + impl Drop for Inner { fn drop(&mut self) { self.core.core.drop_slots(&mut self.slots[..]) } diff --git a/src/mpsc/sync.rs b/src/mpsc/sync.rs index 85058ff..1ceafb7 100644 --- a/src/mpsc/sync.rs +++ b/src/mpsc/sync.rs @@ -10,17 +10,31 @@ use crate::{ sync::Arc, thread::{self, Thread}, }, + recycling::{self, Recycle}, util::Backoff, wait::queue, Ref, }; use core::{fmt, pin::Pin}; -/// Returns a new asynchronous multi-producer, single consumer channel. -pub fn channel(capacity: usize) -> (Sender, Receiver) { +/// Returns a new synchronous multi-producer, single consumer channel. +pub fn channel(capacity: usize) -> (Sender, Receiver) { + with_recycle(capacity, recycling::DefaultRecycle::new()) +} + +/// Returns a new synchronous multi-producer, single consumer channel with +/// the provided [recycling policy]. +/// +/// [recycling policy]: crate::recycling::Recycle +pub fn with_recycle>( + capacity: usize, + recycle: R, +) -> (Sender, Receiver) { + assert!(capacity > 0); let inner = Arc::new(Inner { core: ChannelCore::new(capacity), slots: Slot::make_boxed_array(capacity), + recycle, }); let tx = Sender { inner: inner.clone(), @@ -30,13 +44,13 @@ pub fn channel(capacity: usize) -> (Sender, Receiver) { } #[derive(Debug)] -pub struct Sender { - inner: Arc>, +pub struct Sender { + inner: Arc>, } #[derive(Debug)] -pub struct Receiver { - inner: Arc>, +pub struct Receiver { + inner: Arc>, } /// A statically-allocated, blocking bounded MPSC channel. @@ -78,25 +92,29 @@ pub struct Receiver { /// [async]: crate::mpsc::StaticChannel /// [`split`]: StaticChannel::split #[cfg_attr(all(loom, test), allow(dead_code))] -pub struct StaticChannel { +pub struct StaticChannel { core: ChannelCore, slots: [Slot; CAPACITY], is_split: AtomicBool, + recycle: R, } -pub struct StaticSender { +pub struct StaticSender { core: &'static ChannelCore, slots: &'static [Slot], + recycle: &'static R, } -pub struct StaticReceiver { +pub struct StaticReceiver { core: &'static ChannelCore, slots: &'static [Slot], + recycle: &'static R, } -struct Inner { +struct Inner { core: super::ChannelCore, slots: Box<[Slot]>, + recycle: R, } impl_send_ref! { @@ -154,9 +172,12 @@ impl StaticChannel { core: ChannelCore::new(CAPACITY), slots: Slot::make_static_array::(), is_split: AtomicBool::new(false), + recycle: recycling::DefaultRecycle::new(), } } +} +impl StaticChannel { /// Split a [`StaticChannel`] into a [`StaticSender`]/[`StaticReceiver`] /// pair. /// @@ -168,7 +189,7 @@ impl StaticChannel { /// # Panics /// /// If the channel has already been split. - pub fn split(&'static self) -> (StaticSender, StaticReceiver) { + pub fn split(&'static self) -> (StaticSender, StaticReceiver) { self.try_split().expect("channel already split") } @@ -178,17 +199,19 @@ impl StaticChannel { /// A static channel can only be split a single time. If /// [`StaticChannel::split`] or [`StaticChannel::try_split`] have been /// called previously, this method returns `None`. - pub fn try_split(&'static self) -> Option<(StaticSender, StaticReceiver)> { + pub fn try_split(&'static self) -> Option<(StaticSender, StaticReceiver)> { self.is_split .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .ok()?; let tx = StaticSender { core: &self.core, slots: &self.slots[..], + recycle: &self.recycle, }; let rx = StaticReceiver { core: &self.core, slots: &self.slots[..], + recycle: &self.recycle, }; Some((tx, rx)) } @@ -196,20 +219,29 @@ impl StaticChannel { // === impl Sender === -impl Sender { +impl Sender +where + R: Recycle, +{ pub fn try_send_ref(&self) -> Result, TrySendError> { self.inner .core - .try_send_ref(self.inner.slots.as_ref()) + .try_send_ref(self.inner.slots.as_ref(), &self.inner.recycle) .map(SendRef) } pub fn try_send(&self, val: T) -> Result<(), TrySendError> { - self.inner.core.try_send(self.inner.slots.as_ref(), val) + self.inner + .core + .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle) } pub fn send_ref(&self) -> Result, Closed> { - send_ref(&self.inner.core, self.inner.slots.as_ref()) + send_ref( + &self.inner.core, + self.inner.slots.as_ref(), + &self.inner.recycle, + ) } pub fn send(&self, val: T) -> Result<(), Closed> { @@ -223,7 +255,7 @@ impl Sender { } } -impl Clone for Sender { +impl Clone for Sender { fn clone(&self) -> Self { test_dbg!(self.inner.core.tx_count.fetch_add(1, Ordering::Relaxed)); Self { @@ -232,7 +264,7 @@ impl Clone for Sender { } } -impl Drop for Sender { +impl Drop for Sender { fn drop(&mut self) { if test_dbg!(self.inner.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 { return; @@ -248,14 +280,17 @@ impl Drop for Sender { // === impl Receiver === -impl Receiver { +impl Receiver { pub fn recv_ref(&self) -> Option> { recv_ref(&self.inner.core, self.inner.slots.as_ref()) } - pub fn recv(&self) -> Option { - let val = self.recv_ref()?.with_mut(core::mem::take); - Some(val) + pub fn recv(&self) -> Option + where + R: Recycle, + { + let mut val = self.recv_ref()?; + Some(recycling::take(&mut *val, &self.inner.recycle)) } pub fn is_closed(&self) -> bool { @@ -263,7 +298,7 @@ impl Receiver { } } -impl<'a, T: Default> Iterator for &'a Receiver { +impl<'a, T, R> Iterator for &'a Receiver { type Item = RecvRef<'a, T>; fn next(&mut self) -> Option { @@ -271,7 +306,7 @@ impl<'a, T: Default> Iterator for &'a Receiver { } } -impl Drop for Receiver { +impl Drop for Receiver { fn drop(&mut self) { self.inner.core.close_rx(); } @@ -279,17 +314,22 @@ impl Drop for Receiver { // === impl StaticSender === -impl StaticSender { +impl StaticSender +where + R: Recycle, +{ pub fn try_send_ref(&self) -> Result, TrySendError> { - self.core.try_send_ref(self.slots).map(SendRef) + self.core + .try_send_ref(self.slots, self.recycle) + .map(SendRef) } pub fn try_send(&self, val: T) -> Result<(), TrySendError> { - self.core.try_send(self.slots, val) + self.core.try_send(self.slots, val, self.recycle) } pub fn send_ref(&self) -> Result, Closed> { - send_ref(self.core, self.slots) + send_ref(self.core, self.slots, self.recycle) } pub fn send(&self, val: T) -> Result<(), Closed> { @@ -303,17 +343,18 @@ impl StaticSender { } } -impl Clone for StaticSender { +impl Clone for StaticSender { fn clone(&self) -> Self { test_dbg!(self.core.tx_count.fetch_add(1, Ordering::Relaxed)); Self { core: self.core, slots: self.slots, + recycle: self.recycle, } } } -impl Drop for StaticSender { +impl Drop for StaticSender { fn drop(&mut self) { if test_dbg!(self.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 { return; @@ -327,25 +368,29 @@ impl Drop for StaticSender { } } -impl fmt::Debug for StaticReceiver { +impl fmt::Debug for StaticSender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("StaticReceiver") + f.debug_struct("StaticSender") .field("core", &self.core) .field("slots", &format_args!("&[..]")) + .field("recycle", &self.recycle) .finish() } } -// === impl Receiver === +// === impl StaticReceiver === -impl StaticReceiver { +impl StaticReceiver { pub fn recv_ref(&self) -> Option> { recv_ref(self.core, self.slots) } - pub fn recv(&self) -> Option { - let val = self.recv_ref()?.with_mut(core::mem::take); - Some(val) + pub fn recv(&self) -> Option + where + R: Recycle, + { + let mut val = self.recv_ref()?; + Some(recycling::take(&mut *val, self.recycle)) } pub fn is_closed(&self) -> bool { @@ -353,7 +398,7 @@ impl StaticReceiver { } } -impl<'a, T: Default> Iterator for &'a StaticReceiver { +impl<'a, T, R> Iterator for &'a StaticReceiver { type Item = RecvRef<'a, T>; fn next(&mut self) -> Option { @@ -361,43 +406,42 @@ impl<'a, T: Default> Iterator for &'a StaticReceiver { } } -impl Drop for StaticReceiver { +impl Drop for StaticReceiver { fn drop(&mut self) { self.core.close_rx(); } } -impl fmt::Debug for StaticSender { +impl fmt::Debug for StaticReceiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("StaticSender") + f.debug_struct("StaticReceiver") .field("core", &self.core) .field("slots", &format_args!("&[..]")) + .field("recycle", &self.recycle) .finish() } } // === impl Inner === -impl fmt::Debug for Inner { +impl fmt::Debug for Inner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Inner") .field("core", &self.core) .field("slots", &format_args!("Box<[..]>")) + .field("recycle", &self.recycle) .finish() } } -impl Drop for Inner { +impl Drop for Inner { fn drop(&mut self) { self.core.core.drop_slots(&mut self.slots[..]) } } #[inline] -fn recv_ref<'a, T: Default>( - core: &'a ChannelCore, - slots: &'a [Slot], -) -> Option> { +fn recv_ref<'a, T>(core: &'a ChannelCore, slots: &'a [Slot]) -> Option> { loop { match core.poll_recv_ref(slots, thread::current) { Poll::Ready(r) => { @@ -415,13 +459,14 @@ fn recv_ref<'a, T: Default>( } #[inline] -fn send_ref<'a, T: Default>( +fn send_ref<'a, T, R: Recycle>( core: &'a ChannelCore, slots: &'a [Slot], + recycle: &'a R, ) -> Result, Closed<()>> { // fast path: avoid getting the thread and constructing the node if the // slot is immediately ready. - match core.try_send_ref(slots) { + match core.try_send_ref(slots, recycle) { Ok(slot) => return Ok(SendRef(slot)), Err(TrySendError::Closed(_)) => return Err(Closed(())), _ => {} @@ -449,7 +494,7 @@ fn send_ref<'a, T: Default>( WaitResult::Closed => return Err(Closed(())), WaitResult::Notified => { boff.spin_yield(); - match core.try_send_ref(slots.as_ref()) { + match core.try_send_ref(slots.as_ref(), recycle) { Ok(slot) => return Ok(SendRef(slot)), Err(TrySendError::Closed(_)) => return Err(Closed(())), _ => {} diff --git a/src/recycle.rs b/src/recycle.rs deleted file mode 100644 index abedc75..0000000 --- a/src/recycle.rs +++ /dev/null @@ -1,25 +0,0 @@ -pub trait Recycle { - /// Returns a new instance of type `T`. - fn new_element(&self) -> T; - - /// Resets `element` in place. - /// - /// Typically, this retains any previous allocations. - fn recycle(&self, element: &mut T); -} - -#[derive(Debug, Default)] -pub struct DefaultRecycle(()); - -impl Recycle for DefaultRecycle -where - T: Default + Clone, -{ - fn new_element(&self) -> T { - T::default() - } - - fn recycle(&self, element: &mut T) { - element.clone_from(&T::default()) - } -} diff --git a/src/recycling.rs b/src/recycling.rs new file mode 100644 index 0000000..a38ec80 --- /dev/null +++ b/src/recycling.rs @@ -0,0 +1,463 @@ +//! Configurable policies for element reuse. + +pub trait Recycle { + /// Returns a new instance of type `T`. + fn new_element(&self) -> T; + + /// Resets `element` in place. + /// + /// Typically, this retains any previous allocations. + fn recycle(&self, element: &mut T); +} + +/// A [`Recycle`] implementation for any type implementing [`Default`] and +/// [`Clone`]. +/// +/// This [creates new elements] by calling using [`Default::default()`]. +/// Existing elements are [recycled] by calling [`Clone::clone_from`] with the +/// default value. +/// +/// # Allocation Reuse +/// +/// [`Clone::clone_from`] is not *guaranteed* to reuse existing +/// allocations in place. For a number of common types in the standard library, +/// such as [`Box`], [`String`], [`Vec`], and collections based on [`Vec`] (such +/// as [`VecDeque`] and [`BinaryHeap`]), `clone_from` is overridden to reuse +/// existing allocations in place. However, other types may not override +/// `clone_from` in this way. +/// +/// `DefaultRecycle` will always *work* for types that implement [`Default`] and +/// [`Clone`], but it cannot be guaranteed to always reuse allocations. For a +/// more restrictive [`Recycle`] implementation that _will_ always reuse +/// existing allocations, consider [`WithCapacity`]. +/// +/// [creates new elements]: DefaultRecycle::new_element +/// [recycled]: DefaultRecycle::recycle +#[derive(Clone, Debug, Default)] +pub struct DefaultRecycle(()); + +/// A [`Recycle`] implementation for types that provide `with_capacity`, +/// `clear`, and `shrink_to` methods. +/// +/// This includes all array-based collections in the Rust standard library, such +/// as [`Vec`], [`String`], [`VecDeque`], and [`BinaryHeap`], as well as +/// [`HashMap`] and [`HashSet`]. +/// +/// # Usage +/// +/// By default, this type will always [recycle] elements by clearing all values +/// in place, returning all allocated capacity. [New elements] are allocated +/// with capacity for 0 values; they will allocate when first used. +/// +/// # Implementations for Other Types +/// +/// [`Recycle`] implementations may be added for similar data structures +/// implemented in other libraries. The [`min_capacity`] and +/// [`max_capacity`] methods expose the configured initial capacity and upper +/// bound. +/// +/// As an example, a library that implements an array-based data structure with +/// `with_capacity`, `clear`, and `shrink_to` methods can implement [`Recycle`] +/// for `WithCapacity` like so: +/// +/// ``` +/// use thingbuf::recycling::{self, Recycle}; +/// # use std::marker::PhantomData; +/// +/// /// Some kind of exciting new heap-allocated collection. +/// pub struct MyCollection { +/// // ... +/// # _p: PhantomData, +/// } +/// +/// impl MyCollection { +/// /// Returns a new `MyCollection` with enough capacity to hold +/// /// `capacity` elements without reallocationg. +/// pub fn with_capacity(capacity: usize) -> Self { +/// // ... +/// # unimplemented!() +/// } +/// +/// /// Returns the current allocated capacity of this `MyCollection`. +/// pub fn capacity(&self) -> usize { +/// // ... +/// # unimplemented!() +/// } +/// +/// /// Shrinks the capacity of the `MyCollection` with a lower bound. +/// /// +/// /// The capacity will remain at least as large as both the length +/// /// and the supplied value. +/// /// +/// /// If the current capacity is less than the lower limit, this is a no-op. +/// pub fn shrink_to(&mut self, min_capacity: usize) { +/// if self.capacity() > min_capacity { +/// // ... +/// # unimplemented!() +/// } +/// } +/// +/// /// Clears the `MyCollection`, removing all values. +/// /// +/// /// This does not change the current allocated capacity. The +/// /// `MyCollection` will still have enough allocated storage to hold +/// /// at least the current number of values. +/// pub fn clear(&mut self) { +/// // ... +/// # unimplemented!() +/// } +/// +/// // Other cool and exciting methods go here! +/// } +/// +/// // Because `MyCollection` has `with_capacity`, `shrink_to`, and `clear` methods, +/// // we can implement `Recycle>` for `WithCapacity` exactly the same +/// // way as it is implemented for standard library collections. +/// impl Recycle> for recycling::WithCapacity { +/// fn new_element(&self) -> MyCollection { +/// // Allocate a new element with the minimum initial capacity: +/// MyCollection::with_capacity(self.min_capacity()) +/// } +/// +/// fn recycle(&self, element: &mut MyCollection) { +/// // Recycle the element by clearing it in place, and then limiting the +/// // allocated capacity to the upper bound, if one is set: +/// element.clear(); +/// element.shrink_to(self.max_capacity()); +/// } +/// } +/// ``` +/// +/// # Allocation Reuse +/// +/// When an upper bound is not set, this recycling policy will _always_ reuse +/// any allocated capacity when recycling an element. Over time, the number of +/// reallocations required to grow items in a pool should decrease, amortizing +/// reallocations over the lifetime of the program. +/// +/// Of course, this means that it is technically possible for the allocated +/// capacity of the pool to grow infinitely, which can cause a memory leak if +/// used incorrectly. Therefore, it is also possible to set an upper bound on +/// idle capacity, using [`with_max_capacity`]. When such a bound is set, +/// recycled elements will be shrunk down to that capacity if they have grown +/// past the upper bound while in use. If this is the case, reallocations may +/// occur more often, but if the upper bound is higher than the typical required +/// capacity, they should remain infrequent. +/// +/// If elements will not require allocations of differing sizes, and the size is +/// known in advance (e.g. a pool of `HashMap`s that always have exactly 64 +/// elements), the [`with_max_capacity`] and [`with_min_capacity`] methods can +/// be called with the same value. This way, elements will always be initially +/// allocated with *exactly* that much capacity, and will only be shrunk if they +/// ever exceed that capacity. If the elements never grow beyond the specified +/// capacity, this should mean that no additional allocations will ever occur +/// once the initial pool of elements are allocated. +/// +/// [recycle]: Recycle::recycle +/// [`max_capacity`]: Self::max_capacity +/// [`min_capacity`]: Self::min_capacity +/// [`with_max_capacity`]: Self::with_max_capacity +/// [`with_min_capacity`]: Self::with_min_capacity +#[derive(Clone, Debug)] +pub struct WithCapacity { + min: usize, + max: usize, +} + +// TODO(eliza): consider making this public? +// TODO(eliza): consider making this a trait method with a default impl? +#[inline(always)] +pub(crate) fn take(element: &mut T, recycle: &R) -> T +where + R: Recycle, +{ + core::mem::replace(element, recycle.new_element()) +} + +impl DefaultRecycle { + pub const fn new() -> Self { + Self(()) + } +} + +impl Recycle for DefaultRecycle +where + T: Default + Clone, +{ + fn new_element(&self) -> T { + T::default() + } + + fn recycle(&self, element: &mut T) { + element.clone_from(&T::default()) + } +} + +// === impl WithCapacity === + +impl WithCapacity { + /// Returns a new [`WithCapacity`]. + /// + /// By default, the maximum capacity is unconstrained, and the minimum + /// capacity is 0. Existing allocations will always be reused, regardless + /// of size, and new elements will be created with 0 capacity. + /// + /// To add an upper bound on re-used capacity, use + /// [`WithCapacity::with_max_capacity`]. To allocate elements with an + /// initial capacity, use [`WithCapacity::with_min_capacity`]. + pub const fn new() -> Self { + Self { + max: core::usize::MAX, + min: 0, + } + } + + /// Sets an upper bound on the capacity that will be reused when [recycling] + /// elements. + /// + /// When an element is recycled, if its capacity exceeds the max value, it + /// will be shrunk down to that capacity. This will result in a + /// reallocation, but limits the total capacity allocated by the pool, + /// preventing unbounded memory use. + /// + /// Elements may still exceed the configured max capacity *while they are in + /// use*; this value only configures what happens when they are returned to + /// the pool. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// // Create a recycler with max capacity of 8. + /// let recycle = WithCapacity::new().with_max_capacity(8); + /// + /// // Create a new string using that recycler. + /// let mut s: String = recycle.new_element(); + /// assert_eq!(s.capacity(), 0); + /// + /// // Now, write some data to the string. + /// s.push_str("hello, world"); + /// + /// // The string's capacity must be at least the length of the + /// // string 'hello, world'. + /// assert!(s.capacity() >= "hello, world".len()); + /// + /// // After recycling the string, its capacity will be shrunk down + /// // to the configured max capacity. + /// recycle.recycle(&mut s); + /// assert_eq!(s.capacity(), 8); + /// ``` + /// + /// [recycling]: Recycle::recycle + pub const fn with_max_capacity(self, max: usize) -> Self { + Self { max, ..self } + } + + /// Sets the minimum capacity when [allocating new elements][new]. + /// + /// When new elements are created, they will be allocated with at least + /// `min` capacity. + /// + /// Note that this is a *lower bound*. Elements may be allocated with + /// greater than the minimum capacity, depending on the behavior of the + /// element being allocated, but there will always be *at least* `min` + /// capacity. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// // A recycler without a minimum capacity. + /// let no_min = WithCapacity::new(); + /// + /// // A new element created by this recycler will not + /// // allocate any capacity until it is used. + /// let s: String = no_min.new_element(); + /// assert_eq!(s.capacity(), 0); + /// + /// // Now, configure a minimum capacity. + /// let with_min = WithCapacity::new().with_min_capacity(8); + /// + /// // New elements created by this recycler will always be allocated + /// // with at least the specified capacity. + /// let s: String = with_min.new_element(); + /// assert!(s.capacity() >= 8); + /// ``` + /// + /// [new]: Recycle::new_element + pub const fn with_min_capacity(self, min: usize) -> Self { + Self { min, ..self } + } + + /// Returns the minimum initial capacity when [allocating new + /// elements][new]. + /// + /// This method can be used to implement `Recycle for WithCapacity` where + /// `T` is a type defined outside of this crate. See [the `WithCapacity` + /// documentation][impling] for details. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// let recycle = WithCapacity::new(); + /// assert_eq!(recycle.min_capacity(), 0); + /// ``` + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// let recycle = WithCapacity::new().with_min_capacity(64); + /// assert_eq!(recycle.min_capacity(), 64); + /// ``` + /// + /// [new]: Recycle::new_element + /// [impling]: WithCapacity#implementations-for-other-types + pub fn min_capacity(&self) -> usize { + self.min + } + + /// Returns the maximum retained capacity when [recycling + /// elements][recycle]. + /// + /// If no upper bound is configured, this will return [`usize::MAX`]. + /// + /// This method can be used to implement `Recycle for WithCapacity` where + /// `T` is a type defined outside of this crate. See [the `WithCapacity` + /// documentation][impling] for details. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// let recycle = WithCapacity::new(); + /// assert_eq!(recycle.max_capacity(), usize::MAX); + /// ``` + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// let recycle = WithCapacity::new().with_max_capacity(64); + /// assert_eq!(recycle.max_capacity(), 64); + /// ``` + /// + /// [recycle]: Recycle::recycle + /// [impling]: WithCapacity#implementations-for-other-types + pub fn max_capacity(&self) -> usize { + self.max + } +} + +impl Default for WithCapacity { + fn default() -> Self { + Self::new() + } +} + +feature! { + #![feature = "alloc"] + use alloc::{ + collections::{VecDeque, BinaryHeap}, + string::String, + sync::Arc, + vec::Vec, + }; + + impl Recycle for Arc + where + R: Recycle, + { + #[inline] + fn new_element(&self) -> T { + (**self).new_element() + } + + #[inline] + fn recycle(&self, element: &mut T) { + (**self).recycle(element) + } + } + + impl Recycle> for WithCapacity { + fn new_element(&self) -> Vec { + Vec::with_capacity(self.min) + } + + fn recycle(&self, element: &mut Vec) { + element.clear(); + element.shrink_to(self.max); + } + } + + impl Recycle for WithCapacity { + fn new_element(&self) -> String { + String::with_capacity(self.min) + } + + fn recycle(&self, element: &mut String) { + element.clear(); + element.shrink_to(self.max); + } + } + + impl Recycle> for WithCapacity { + fn new_element(&self) -> VecDeque { + VecDeque::with_capacity(self.min) + } + + fn recycle(&self, element: &mut VecDeque) { + element.clear(); + element.shrink_to(self.max); + } + } + + impl Recycle> for WithCapacity { + fn new_element(&self) -> BinaryHeap { + BinaryHeap::with_capacity(self.min) + } + + fn recycle(&self, element: &mut BinaryHeap) { + element.clear(); + element.shrink_to(self.max); + } + } +} + +feature! { + #![feature = "std"] + use std::{hash::{Hash, BuildHasher}, collections::{HashMap, HashSet}}; + + impl Recycle> for WithCapacity + where + K: Hash + Eq, + S: BuildHasher + Default + { + fn new_element(&self) -> HashMap { + HashMap::with_capacity_and_hasher(self.min, Default::default()) + } + + fn recycle(&self, element: &mut HashMap) { + element.clear(); + element.shrink_to(self.max); + } + } + + impl Recycle> for WithCapacity + where + K: Hash + Eq, + S: BuildHasher + Default + { + fn new_element(&self) -> HashSet { + HashSet::with_capacity_and_hasher(self.min, Default::default()) + } + + fn recycle(&self, element: &mut HashSet) { + element.clear(); + element.shrink_to(self.max); + } + } +} diff --git a/src/static_thingbuf.rs b/src/static_thingbuf.rs index ae0a6de..b424eb9 100644 --- a/src/static_thingbuf.rs +++ b/src/static_thingbuf.rs @@ -1,5 +1,8 @@ -use crate::{Core, Full, Ref, Slot}; -use core::{fmt, mem}; +use crate::{ + recycling::{self, Recycle}, + Core, Full, Ref, Slot, +}; +use core::fmt; /// A statically allocated, fixed-size lock-free multi-producer multi-consumer /// queue. @@ -188,8 +191,9 @@ use core::{fmt, mem}; /// [`ThingBuf`]: crate::ThingBuf /// [vyukov]: https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue /// [object pool]: https://en.wikipedia.org/wiki/Object_pool_pattern -pub struct StaticThingBuf { +pub struct StaticThingBuf { core: Core, + recycle: R, slots: [Slot; CAP], } @@ -199,14 +203,21 @@ pub struct StaticThingBuf { impl StaticThingBuf { /// Returns a new `StaticThingBuf` with space for `capacity` elements. pub const fn new() -> Self { - Self { + Self::with_recycle(recycling::DefaultRecycle::new()) + } +} + +impl StaticThingBuf { + pub const fn with_recycle(recycle: R) -> Self { + StaticThingBuf { core: Core::new(CAP), + recycle, slots: Slot::make_static_array::(), } } } -impl StaticThingBuf { +impl StaticThingBuf { /// Returns the *total* capacity of this queue. This includes both /// occupied and unoccupied entries. /// @@ -302,7 +313,10 @@ impl StaticThingBuf { } } -impl StaticThingBuf { +impl StaticThingBuf +where + R: Recycle, +{ /// Reserves a slot to push an element into the queue, returning a [`Ref`] that /// can be used to write to that slot. /// @@ -343,7 +357,7 @@ impl StaticThingBuf { /// /// static MESSAGES: StaticThingBuf = StaticThingBuf::new(); /// - /// #[derive(Default)] + /// #[derive(Clone, Default)] /// struct Message { /// // ... /// } @@ -380,10 +394,12 @@ impl StaticThingBuf { /// [`pop_ref`]: Self::pop_ref /// [`push`]: Self::push_ref pub fn push_ref(&self) -> Result, Full> { - self.core.push_ref(&self.slots).map_err(|e| match e { - crate::mpsc::TrySendError::Full(()) => Full(()), - _ => unreachable!(), - }) + self.core + .push_ref(&self.slots, &self.recycle) + .map_err(|e| match e { + crate::mpsc::TrySendError::Full(()) => Full(()), + _ => unreachable!(), + }) } /// Attempt to enqueue an element by value. @@ -468,7 +484,7 @@ impl StaticThingBuf { #[inline] pub fn pop(&self) -> Option { let mut slot = self.pop_ref()?; - Some(mem::take(&mut *slot)) + Some(recycling::take(&mut *slot, &self.recycle)) } /// Dequeue the first element in the queue by reference, and invoke the @@ -485,17 +501,18 @@ impl StaticThingBuf { } } -impl fmt::Debug for StaticThingBuf { +impl fmt::Debug for StaticThingBuf { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StaticThingBuf") .field("len", &self.len()) .field("slots", &format_args!("[...]")) .field("core", &self.core) + .field("recycle", &self.recycle) .finish() } } -impl Drop for StaticThingBuf { +impl Drop for StaticThingBuf { fn drop(&mut self) { self.core.drop_slots(&mut self.slots[..]); } diff --git a/src/stringbuf.rs b/src/stringbuf.rs deleted file mode 100644 index af7cc7f..0000000 --- a/src/stringbuf.rs +++ /dev/null @@ -1,92 +0,0 @@ -use super::*; - -use alloc::string::String; - -#[derive(Debug)] -pub struct StringBuf { - inner: ThingBuf, - max_idle_capacity: usize, -} - -impl StringBuf { - pub fn new(capacity: usize) -> Self { - Self { - inner: ThingBuf::new(capacity), - max_idle_capacity: usize::MAX, - } - } - - pub fn with_max_idle_capacity(self, max_idle_capacity: usize) -> Self { - Self { - max_idle_capacity, - inner: self.inner, - } - } - - #[inline] - pub fn capacity(&self) -> usize { - self.inner.capacity() - } - - #[inline] - pub fn write(&self) -> Result, Full> { - let mut string = self.inner.push_ref()?; - string.with_mut(String::clear); - Ok(string) - } - - pub fn pop_ref(&self) -> Option> { - let mut string = self.inner.pop_ref()?; - string.with_mut(|string| { - if string.capacity() > self.max_idle_capacity { - string.shrink_to_fit(); - } - }); - Some(string) - } -} - -#[derive(Debug)] -pub struct StaticStringBuf { - inner: StaticThingBuf, - max_idle_capacity: usize, -} - -impl StaticStringBuf { - #[cfg(not(test))] - pub const fn new() -> Self { - Self { - inner: StaticThingBuf::new(), - max_idle_capacity: usize::MAX, - } - } - - pub fn with_max_idle_capacity(self, max_idle_capacity: usize) -> Self { - Self { - max_idle_capacity, - inner: self.inner, - } - } - - #[inline] - pub fn capacity(&self) -> usize { - self.inner.capacity() - } - - #[inline] - pub fn write(&self) -> Result, Full> { - let mut string = self.inner.push_ref()?; - string.with_mut(String::clear); - Ok(string) - } - - pub fn pop_ref(&self) -> Option> { - let mut string = self.inner.pop_ref()?; - string.with_mut(|string| { - if string.capacity() > self.max_idle_capacity { - string.shrink_to_fit(); - } - }); - Some(string) - } -} diff --git a/src/thingbuf.rs b/src/thingbuf.rs index da02af2..e93b307 100644 --- a/src/thingbuf.rs +++ b/src/thingbuf.rs @@ -1,6 +1,9 @@ -use crate::{Core, Full, Ref, Slot}; +use crate::{ + recycling::{self, Recycle}, + Core, Full, Ref, Slot, +}; use alloc::boxed::Box; -use core::{fmt, mem}; +use core::fmt; #[cfg(all(loom, test))] mod tests; @@ -177,20 +180,128 @@ mod tests; /// [vyukov]: https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue /// [object pool]: https://en.wikipedia.org/wiki/Object_pool_pattern #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))] -pub struct ThingBuf { +pub struct ThingBuf { pub(crate) core: Core, pub(crate) slots: Box<[Slot]>, + recycle: R, } // === impl ThingBuf === -impl ThingBuf { +impl ThingBuf { /// Returns a new `ThingBuf` with space for `capacity` elements. pub fn new(capacity: usize) -> Self { + Self::with_recycle(capacity, recycling::DefaultRecycle::new()) + } +} + +impl ThingBuf { + /// Returns the *total* capacity of this queue. This includes both + /// occupied and unoccupied entries. + /// + /// To determine the queue's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::ThingBuf; + /// + /// let q = ThingBuf::::new(100); + /// assert_eq!(q.capacity(), 100); + /// ``` + /// + /// Even after pushing several messages to the queue, the capacity remains + /// the same: + /// ``` + /// # use thingbuf::ThingBuf; + /// + /// let q = ThingBuf::::new(100); + /// + /// *q.push_ref().unwrap() = 1; + /// *q.push_ref().unwrap() = 2; + /// *q.push_ref().unwrap() = 3; + /// + /// assert_eq!(q.capacity(), 100); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + pub fn capacity(&self) -> usize { + self.slots.len() + } + + /// Returns the unoccupied capacity of the queue (i.e., how many additional + /// elements can be enqueued before the queue will be full). + /// + /// This is equivalent to subtracting the queue's [`len`] from its [`capacity`]. + /// + /// [`len`]: Self::len + /// [`capacity`]: Self::capacity + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } + + /// Returns the number of elements in the queue + /// + /// To determine the queue's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::ThingBuf; + /// + /// let q = ThingBuf::new(100); + /// assert_eq!(q.len(), 0); + /// + /// *q.push_ref().unwrap() = 1; + /// *q.push_ref().unwrap() = 2; + /// *q.push_ref().unwrap() = 3; + /// assert_eq!(q.len(), 3); + /// + /// let _ = q.pop_ref(); + /// assert_eq!(q.len(), 2); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + pub fn len(&self) -> usize { + self.core.len() + } + + /// Returns `true` if there are currently no elements in this `ThingBuf`. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::ThingBuf; + /// + /// let q = ThingBuf::new(100); + /// assert!(q.is_empty()); + /// + /// *q.push_ref().unwrap() = 1; + /// assert!(!q.is_empty()); + /// + /// let _ = q.pop_ref(); + /// assert!(q.is_empty()); + /// ``` + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl ThingBuf +where + R: Recycle, +{ + pub fn with_recycle(capacity: usize, recycle: R) -> Self { assert!(capacity > 0); Self { core: Core::new(capacity), slots: Slot::make_boxed_array(capacity), + recycle, } } @@ -232,7 +343,7 @@ impl ThingBuf { /// ```rust /// use thingbuf::ThingBuf; /// - /// #[derive(Default)] + /// #[derive(Clone, Default)] /// struct Message { /// // ... /// } @@ -269,10 +380,12 @@ impl ThingBuf { /// [`pop_ref`]: Self::pop_ref /// [`push`]: Self::push_ref pub fn push_ref(&self) -> Result, Full> { - self.core.push_ref(&*self.slots).map_err(|e| match e { - crate::mpsc::TrySendError::Full(()) => Full(()), - _ => unreachable!(), - }) + self.core + .push_ref(&*self.slots, &self.recycle) + .map_err(|e| match e { + crate::mpsc::TrySendError::Full(()) => Full(()), + _ => unreachable!(), + }) } /// Attempt to enqueue an element by value. @@ -357,7 +470,7 @@ impl ThingBuf { #[inline] pub fn pop(&self) -> Option { let mut slot = self.pop_ref()?; - Some(mem::take(&mut *slot)) + Some(recycling::take(&mut *slot, &self.recycle)) } /// Dequeue the first element in the queue by reference, and invoke the @@ -374,115 +487,19 @@ impl ThingBuf { } } -impl ThingBuf { - /// Returns the *total* capacity of this queue. This includes both - /// occupied and unoccupied entries. - /// - /// To determine the queue's remaining *unoccupied* capacity, use - /// [`remaining`] instead. - /// - /// # Examples - /// - /// ``` - /// use thingbuf::ThingBuf; - /// - /// let q = ThingBuf::::new(100); - /// assert_eq!(q.capacity(), 100); - /// ``` - /// - /// Even after pushing several messages to the queue, the capacity remains - /// the same: - /// ``` - /// # use thingbuf::ThingBuf; - /// - /// let q = ThingBuf::::new(100); - /// - /// *q.push_ref().unwrap() = 1; - /// *q.push_ref().unwrap() = 2; - /// *q.push_ref().unwrap() = 3; - /// - /// assert_eq!(q.capacity(), 100); - /// ``` - /// - /// [`remaining`]: Self::remaining - #[inline] - pub fn capacity(&self) -> usize { - self.slots.len() - } - - /// Returns the unoccupied capacity of the queue (i.e., how many additional - /// elements can be enqueued before the queue will be full). - /// - /// This is equivalent to subtracting the queue's [`len`] from its [`capacity`]. - /// - /// [`len`]: Self::len - /// [`capacity`]: Self::capacity - pub fn remaining(&self) -> usize { - self.capacity() - self.len() - } - - /// Returns the number of elements in the queue - /// - /// To determine the queue's remaining *unoccupied* capacity, use - /// [`remaining`] instead. - /// - /// # Examples - /// - /// ``` - /// use thingbuf::ThingBuf; - /// - /// let q = ThingBuf::new(100); - /// assert_eq!(q.len(), 0); - /// - /// *q.push_ref().unwrap() = 1; - /// *q.push_ref().unwrap() = 2; - /// *q.push_ref().unwrap() = 3; - /// assert_eq!(q.len(), 3); - /// - /// let _ = q.pop_ref(); - /// assert_eq!(q.len(), 2); - /// ``` - /// - /// [`remaining`]: Self::remaining - #[inline] - pub fn len(&self) -> usize { - self.core.len() - } - - /// Returns `true` if there are currently no elements in this `ThingBuf`. - /// - /// # Examples - /// - /// ``` - /// use thingbuf::ThingBuf; - /// - /// let q = ThingBuf::new(100); - /// assert!(q.is_empty()); - /// - /// *q.push_ref().unwrap() = 1; - /// assert!(!q.is_empty()); - /// - /// let _ = q.pop_ref(); - /// assert!(q.is_empty()); - /// ``` - #[inline] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -impl Drop for ThingBuf { +impl Drop for ThingBuf { fn drop(&mut self) { self.core.drop_slots(&mut self.slots[..]); } } -impl fmt::Debug for ThingBuf { +impl fmt::Debug for ThingBuf { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ThingBuf") .field("len", &self.len()) .field("slots", &format_args!("[...]")) .field("core", &self.core) + .field("recycle", &self.recycle) .finish() } } diff --git a/tests/static_storage.rs b/tests/static_storage.rs index d55c3af..209e294 100644 --- a/tests/static_storage.rs +++ b/tests/static_storage.rs @@ -3,7 +3,7 @@ use std::{ sync::atomic::{AtomicBool, Ordering}, thread, }; -use thingbuf::{StaticStringBuf, StaticThingBuf}; +use thingbuf::{recycling, StaticThingBuf}; #[test] fn static_storage_thingbuf() { @@ -50,13 +50,16 @@ fn static_storage_thingbuf() { #[test] fn static_storage_stringbuf() { - static BUF: StaticStringBuf<8> = StaticStringBuf::new(); + use recycling::WithCapacity; + + static BUF: StaticThingBuf = + StaticThingBuf::with_recycle(WithCapacity::new().with_max_capacity(8)); static PRODUCER_LIVE: AtomicBool = AtomicBool::new(true); let producer = thread::spawn(move || { for i in 0..16 { let mut string = 'write: loop { - match BUF.write() { + match BUF.push_ref() { Ok(string) => break 'write string, _ => thread::yield_now(), }