diff --git a/src/lib.rs b/src/lib.rs index b2da4ea..e5dd1bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,36 +2,36 @@ #![cfg_attr(not(feature = "std"), no_std)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] use core::{cmp, fmt, mem::MaybeUninit, ops, ptr}; - #[macro_use] mod macros; mod loom; -mod recycle; +pub mod mpsc; +pub mod recycling; mod util; mod wait; +pub use self::recycling::Recycle; + #[cfg_attr(docsrs, doc = include_str!("../mpsc_perf_comparison.md"))] pub mod mpsc_perf_comparison { // Empty module, used only for documentation. } +feature! { + #![not(all(loom, test))] + mod static_thingbuf; + pub use self::static_thingbuf::StaticThingBuf; +} + feature! { #![feature = "alloc"] extern crate alloc; mod thingbuf; pub use self::thingbuf::ThingBuf; - - mod stringbuf; - pub use stringbuf::{StaticStringBuf, StringBuf}; } -pub mod mpsc; - -mod static_thingbuf; -pub use self::static_thingbuf::StaticThingBuf; - use crate::{ loom::{ atomic::{AtomicUsize, Ordering::*}, @@ -97,7 +97,6 @@ impl Core { closed, idx_mask, capacity, - has_dropped_slots: false, } } @@ -116,7 +115,6 @@ impl Core { gen_mask, idx_mask, capacity, - #[cfg(debug_assertions)] has_dropped_slots: false, } @@ -155,12 +153,13 @@ impl Core { } #[inline(always)] - fn push_ref<'slots, T, S>( + fn push_ref<'slots, T, S, R>( &self, slots: &'slots S, + recycle: &R, ) -> Result, mpsc::TrySendError<()>> where - T: Default, + R: Recycle, S: ops::Index> + ?Sized, { test_println!("push_ref"); @@ -190,13 +189,20 @@ impl Core { // Claim exclusive ownership over the slot let ptr = slot.value.get_mut(); - if gen == 0 { - unsafe { - // Safety: we have just claimed exclusive ownership over - // this slot. - ptr.deref().write(T::default()); - }; - test_println!("-> initialized"); + // Initialize or recycle the element. + unsafe { + // Safety: we have just claimed exclusive ownership over + // this slot. + let ptr = ptr.deref(); + if gen == 0 { + ptr.write(recycle.new_element()); + test_println!("-> initialized"); + } else { + // Safety: if the generation is > 0, then the + // slot has already been initialized. + recycle.recycle(ptr.assume_init_mut()); + test_println!("-> recycled"); + } } return Ok(Ref { diff --git a/src/loom.rs b/src/loom.rs index 34b1006..7b67828 100644 --- a/src/loom.rs +++ b/src/loom.rs @@ -187,6 +187,16 @@ mod inner { Self::new(T::default()) } } + + impl Clone for Track { + fn clone(&self) -> Self { + Self::new(self.get_ref().clone()) + } + + fn clone_from(&mut self, source: &Self) { + self.get_mut().clone_from(source.get_ref()); + } + } } } diff --git a/src/mpsc.rs b/src/mpsc.rs index c2067a3..8090e3d 100644 --- a/src/mpsc.rs +++ b/src/mpsc.rs @@ -12,6 +12,7 @@ use crate::{ loom::{atomic::AtomicUsize, hint}, + recycling::Recycle, wait::{Notify, WaitCell, WaitQueue, WaitResult}, Core, Ref, Slot, }; @@ -109,24 +110,25 @@ impl ChannelCore where N: Notify + Unpin, { - fn try_send_ref<'a, T>( + fn try_send_ref<'a, T, R>( &'a self, slots: &'a [Slot], + recycle: &R, ) -> Result, TrySendError> where - T: Default, + R: Recycle, { - self.core.push_ref(slots).map(|slot| SendRefInner { + self.core.push_ref(slots, recycle).map(|slot| SendRefInner { _notify: NotifyRx(&self.rx_wait), slot, }) } - fn try_send(&self, slots: &[Slot], val: T) -> Result<(), TrySendError> + fn try_send(&self, slots: &[Slot], val: T, recycle: &R) -> Result<(), TrySendError> where - T: Default, + R: Recycle, { - match self.try_send_ref(slots) { + match self.try_send_ref(slots, recycle) { Ok(mut slot) => { slot.with_mut(|slot| *slot = val); Ok(()) @@ -147,7 +149,6 @@ where ) -> Poll>> where S: Index> + ?Sized, - T: Default, { macro_rules! try_poll_recv { () => { diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index f5ca764..dd337d8 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -1,6 +1,7 @@ use super::*; use crate::{ loom::atomic::{self, AtomicBool, Ordering}, + recycling::{self, Recycle}, wait::queue, Ref, }; @@ -16,12 +17,21 @@ feature! { use crate::loom::sync::Arc; - /// Returns a new synchronous multi-producer, single consumer channel. - pub fn channel(capacity: usize) -> (Sender, Receiver) { + /// Returns a new asynchronous multi-producer, single consumer channel. + pub fn channel(capacity: usize) -> (Sender, Receiver) { + with_recycle(capacity, recycling::DefaultRecycle::new()) + } + + /// Returns a new asynchronous multi-producer, single consumer channel with + /// the provided [recycling policy]. + /// + /// [recycling policy]: crate::recycling::Recycle + pub fn with_recycle>(capacity: usize, recycle: R) -> (Sender, Receiver) { assert!(capacity > 0); let inner = Arc::new(Inner { core: ChannelCore::new(capacity), slots: Slot::make_boxed_array(capacity), + recycle, }); let tx = Sender { inner: inner.clone(), @@ -32,18 +42,19 @@ feature! { #[derive(Debug)] - pub struct Receiver { - inner: Arc>, + pub struct Receiver { + inner: Arc>, } #[derive(Debug)] - pub struct Sender { - inner: Arc>, + pub struct Sender { + inner: Arc>, } - struct Inner { + struct Inner { core: super::ChannelCore, slots: Box<[Slot]>, + recycle: R, } } @@ -79,19 +90,22 @@ feature! { /// ``` /// [`split`]: StaticChannel::split #[cfg_attr(all(loom, test), allow(dead_code))] -pub struct StaticChannel { +pub struct StaticChannel { core: ChannelCore, + recycle: R, slots: [Slot; CAPACITY], is_split: AtomicBool, } -pub struct StaticSender { +pub struct StaticSender { core: &'static ChannelCore, + recycle: &'static R, slots: &'static [Slot], } -pub struct StaticReceiver { +pub struct StaticReceiver { core: &'static ChannelCore, + recycle: &'static R, slots: &'static [Slot], } @@ -122,15 +136,17 @@ pub struct RecvRefFuture<'a, T> { /// /// [`ThingBuf`]: crate::ThingBuf #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct RecvFuture<'a, T> { +pub struct RecvFuture<'a, T, R = recycling::DefaultRecycle> { core: &'a ChannelCore, slots: &'a [Slot], + recycle: &'a R, } #[pin_project::pin_project(PinnedDrop)] -struct SendRefFuture<'sender, T> { +struct SendRefFuture<'sender, T, R> { core: &'sender ChannelCore, slots: &'sender [Slot], + recycle: &'sender R, state: State, #[pin] waiter: queue::Waiter, @@ -183,9 +199,12 @@ impl StaticChannel { core: ChannelCore::new(CAPACITY), slots: Slot::make_static_array::(), is_split: AtomicBool::new(false), + recycle: recycling::DefaultRecycle::new(), } } +} +impl StaticChannel { /// Split a [`StaticChannel`] into a [`StaticSender`]/[`StaticReceiver`] /// pair. /// @@ -197,7 +216,7 @@ impl StaticChannel { /// # Panics /// /// If the channel has already been split. - pub fn split(&'static self) -> (StaticSender, StaticReceiver) { + pub fn split(&'static self) -> (StaticSender, StaticReceiver) { self.try_split().expect("channel already split") } @@ -207,16 +226,18 @@ impl StaticChannel { /// A static channel can only be split a single time. If /// [`StaticChannel::split`] or [`StaticChannel::try_split`] have been /// called previously, this method returns `None`. - pub fn try_split(&'static self) -> Option<(StaticSender, StaticReceiver)> { + pub fn try_split(&'static self) -> Option<(StaticSender, StaticReceiver)> { self.is_split .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .ok()?; let tx = StaticSender { core: &self.core, + recycle: &self.recycle, slots: &self.slots[..], }; let rx = StaticReceiver { core: &self.core, + recycle: &self.recycle, slots: &self.slots[..], }; Some((tx, rx)) @@ -226,22 +247,28 @@ impl StaticChannel { // === impl Sender === #[cfg(feature = "alloc")] -impl Sender { +impl Sender +where + R: Recycle, +{ pub fn try_send_ref(&self) -> Result, TrySendError> { self.inner .core - .try_send_ref(self.inner.slots.as_ref()) + .try_send_ref(self.inner.slots.as_ref(), &self.inner.recycle) .map(SendRef) } pub fn try_send(&self, val: T) -> Result<(), TrySendError> { - self.inner.core.try_send(self.inner.slots.as_ref(), val) + self.inner + .core + .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle) } pub async fn send_ref(&self) -> Result, Closed> { SendRefFuture { core: &self.inner.core, slots: self.inner.slots.as_ref(), + recycle: &self.inner.recycle, state: State::Start, waiter: queue::Waiter::new(), } @@ -260,7 +287,7 @@ impl Sender { } #[cfg(feature = "alloc")] -impl Clone for Sender { +impl Clone for Sender { fn clone(&self) -> Self { test_dbg!(self.inner.core.tx_count.fetch_add(1, Ordering::Relaxed)); Self { @@ -270,7 +297,7 @@ impl Clone for Sender { } #[cfg(feature = "alloc")] -impl Drop for Sender { +impl Drop for Sender { fn drop(&mut self) { if test_dbg!(self.inner.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 { return; @@ -286,7 +313,7 @@ impl Drop for Sender { // === impl Receiver === #[cfg(feature = "alloc")] -impl Receiver { +impl Receiver { pub fn recv_ref(&self) -> RecvRefFuture<'_, T> { RecvRefFuture { core: &self.inner.core, @@ -294,10 +321,14 @@ impl Receiver { } } - pub fn recv(&self) -> RecvFuture<'_, T> { + pub fn recv(&self) -> RecvFuture<'_, T, R> + where + R: Recycle, + { RecvFuture { core: &self.inner.core, slots: self.inner.slots.as_ref(), + recycle: &self.inner.recycle, } } @@ -331,9 +362,12 @@ impl Receiver { /// sender, or when the channel is closed. Note that on multiple calls to /// `poll_recv`, only the [`Waker`] from the [`Context`] passed to the most /// recent call is scheduled to receive a wakeup. - pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { + pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> + where + R: Recycle, + { self.poll_recv_ref(cx) - .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take))) + .map(|opt| opt.map(|mut r| recycling::take(&mut *r, &self.inner.recycle))) } pub fn is_closed(&self) -> bool { @@ -342,7 +376,7 @@ impl Receiver { } #[cfg(feature = "alloc")] -impl Drop for Receiver { +impl Drop for Receiver { fn drop(&mut self) { self.inner.core.close_rx(); } @@ -350,19 +384,25 @@ impl Drop for Receiver { // === impl StaticSender === -impl StaticSender { +impl StaticSender +where + R: Recycle, +{ pub fn try_send_ref(&self) -> Result, TrySendError> { - self.core.try_send_ref(self.slots).map(SendRef) + self.core + .try_send_ref(self.slots, self.recycle) + .map(SendRef) } pub fn try_send(&self, val: T) -> Result<(), TrySendError> { - self.core.try_send(self.slots, val) + self.core.try_send(self.slots, val, self.recycle) } pub async fn send_ref(&self) -> Result, Closed> { SendRefFuture { core: self.core, slots: self.slots, + recycle: self.recycle, state: State::Start, waiter: queue::Waiter::new(), } @@ -386,11 +426,12 @@ impl Clone for StaticSender { Self { core: self.core, slots: self.slots, + recycle: self.recycle, } } } -impl Drop for StaticSender { +impl Drop for StaticSender { fn drop(&mut self) { if test_dbg!(self.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 { return; @@ -403,18 +444,19 @@ impl Drop for StaticSender { } } -impl fmt::Debug for StaticSender { +impl fmt::Debug for StaticSender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StaticSender") .field("core", &self.core) .field("slots", &format_args!("&[..]")) + .field("recycle", self.recycle) .finish() } } // === impl StaticReceiver === -impl StaticReceiver { +impl StaticReceiver { pub fn recv_ref(&self) -> RecvRefFuture<'_, T> { RecvRefFuture { core: self.core, @@ -422,10 +464,14 @@ impl StaticReceiver { } } - pub fn recv(&self) -> RecvFuture<'_, T> { + pub fn recv(&self) -> RecvFuture<'_, T, R> + where + R: Recycle, + { RecvFuture { core: self.core, slots: self.slots, + recycle: self.recycle, } } @@ -459,9 +505,12 @@ impl StaticReceiver { /// sender, or when the channel is closed. Note that on multiple calls to /// `poll_recv`, only the [`Waker`] from the [`Context`] passed to the most /// recent call is scheduled to receive a wakeup. - pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { + pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> + where + R: Recycle, + { self.poll_recv_ref(cx) - .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take))) + .map(|opt| opt.map(|mut r| recycling::take(&mut *r, self.recycle))) } pub fn is_closed(&self) -> bool { @@ -469,17 +518,18 @@ impl StaticReceiver { } } -impl Drop for StaticReceiver { +impl Drop for StaticReceiver { fn drop(&mut self) { self.core.close_rx(); } } -impl fmt::Debug for StaticReceiver { +impl fmt::Debug for StaticReceiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StaticReceiver") .field("core", &self.core) .field("slots", &format_args!("&[..]")) + .field("recycle", &self.recycle) .finish() } } @@ -487,7 +537,7 @@ impl fmt::Debug for StaticReceiver { // === impl RecvRefFuture === #[inline] -fn poll_recv_ref<'a, T: Default>( +fn poll_recv_ref<'a, T>( core: &'a ChannelCore, slots: &'a [Slot], cx: &mut Context<'_>, @@ -501,7 +551,7 @@ fn poll_recv_ref<'a, T: Default>( }) } -impl<'a, T: Default> Future for RecvRefFuture<'a, T> { +impl<'a, T> Future for RecvRefFuture<'a, T> { type Output = Option>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -511,20 +561,24 @@ impl<'a, T: Default> Future for RecvRefFuture<'a, T> { // === impl Recv === -impl<'a, T: Default> Future for RecvFuture<'a, T> { +impl<'a, T, R> Future for RecvFuture<'a, T, R> +where + R: Recycle, +{ type Output = Option; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { poll_recv_ref(self.core, self.slots, cx) - .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take))) + .map(|opt| opt.map(|mut r| recycling::take(&mut *r, self.recycle))) } } // === impl SendRefFuture === -impl<'sender, T: Default + 'sender> Future for SendRefFuture<'sender, T> +impl<'sender, T, R> Future for SendRefFuture<'sender, T, R> where - T: Default + 'sender, + R: Recycle + 'sender, + T: 'sender, { type Output = Result, Closed>; @@ -536,7 +590,7 @@ where let node = this.waiter; match test_dbg!(*this.state) { State::Start => { - match this.core.try_send_ref(this.slots) { + match this.core.try_send_ref(this.slots, *this.recycle) { Ok(slot) => return Poll::Ready(Ok(SendRef(slot))), Err(TrySendError::Closed(_)) => return Poll::Ready(Err(Closed(()))), Err(_) => {} @@ -573,7 +627,7 @@ where } } } - State::Done => match this.core.try_send_ref(this.slots) { + State::Done => match this.core.try_send_ref(this.slots, *this.recycle) { Ok(slot) => return Poll::Ready(Ok(SendRef(slot))), Err(TrySendError::Closed(_)) => return Poll::Ready(Err(Closed(()))), Err(_) => { @@ -586,7 +640,7 @@ where } #[pin_project::pinned_drop] -impl PinnedDrop for SendRefFuture<'_, T> { +impl PinnedDrop for SendRefFuture<'_, T, R> { fn drop(self: Pin<&mut Self>) { test_println!("SendRefFuture::drop({:p})", self); let this = self.project(); @@ -598,16 +652,17 @@ impl PinnedDrop for SendRefFuture<'_, T> { feature! { #![feature = "alloc"] - impl fmt::Debug for Inner { + impl fmt::Debug for Inner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Inner") .field("core", &self.core) .field("slots", &format_args!("Box<[..]>")) + .field("recycle", &self.recycle) .finish() } } - impl Drop for Inner { + impl Drop for Inner { fn drop(&mut self) { self.core.core.drop_slots(&mut self.slots[..]) } diff --git a/src/mpsc/sync.rs b/src/mpsc/sync.rs index 85058ff..1ceafb7 100644 --- a/src/mpsc/sync.rs +++ b/src/mpsc/sync.rs @@ -10,17 +10,31 @@ use crate::{ sync::Arc, thread::{self, Thread}, }, + recycling::{self, Recycle}, util::Backoff, wait::queue, Ref, }; use core::{fmt, pin::Pin}; -/// Returns a new asynchronous multi-producer, single consumer channel. -pub fn channel(capacity: usize) -> (Sender, Receiver) { +/// Returns a new synchronous multi-producer, single consumer channel. +pub fn channel(capacity: usize) -> (Sender, Receiver) { + with_recycle(capacity, recycling::DefaultRecycle::new()) +} + +/// Returns a new synchronous multi-producer, single consumer channel with +/// the provided [recycling policy]. +/// +/// [recycling policy]: crate::recycling::Recycle +pub fn with_recycle>( + capacity: usize, + recycle: R, +) -> (Sender, Receiver) { + assert!(capacity > 0); let inner = Arc::new(Inner { core: ChannelCore::new(capacity), slots: Slot::make_boxed_array(capacity), + recycle, }); let tx = Sender { inner: inner.clone(), @@ -30,13 +44,13 @@ pub fn channel(capacity: usize) -> (Sender, Receiver) { } #[derive(Debug)] -pub struct Sender { - inner: Arc>, +pub struct Sender { + inner: Arc>, } #[derive(Debug)] -pub struct Receiver { - inner: Arc>, +pub struct Receiver { + inner: Arc>, } /// A statically-allocated, blocking bounded MPSC channel. @@ -78,25 +92,29 @@ pub struct Receiver { /// [async]: crate::mpsc::StaticChannel /// [`split`]: StaticChannel::split #[cfg_attr(all(loom, test), allow(dead_code))] -pub struct StaticChannel { +pub struct StaticChannel { core: ChannelCore, slots: [Slot; CAPACITY], is_split: AtomicBool, + recycle: R, } -pub struct StaticSender { +pub struct StaticSender { core: &'static ChannelCore, slots: &'static [Slot], + recycle: &'static R, } -pub struct StaticReceiver { +pub struct StaticReceiver { core: &'static ChannelCore, slots: &'static [Slot], + recycle: &'static R, } -struct Inner { +struct Inner { core: super::ChannelCore, slots: Box<[Slot]>, + recycle: R, } impl_send_ref! { @@ -154,9 +172,12 @@ impl StaticChannel { core: ChannelCore::new(CAPACITY), slots: Slot::make_static_array::(), is_split: AtomicBool::new(false), + recycle: recycling::DefaultRecycle::new(), } } +} +impl StaticChannel { /// Split a [`StaticChannel`] into a [`StaticSender`]/[`StaticReceiver`] /// pair. /// @@ -168,7 +189,7 @@ impl StaticChannel { /// # Panics /// /// If the channel has already been split. - pub fn split(&'static self) -> (StaticSender, StaticReceiver) { + pub fn split(&'static self) -> (StaticSender, StaticReceiver) { self.try_split().expect("channel already split") } @@ -178,17 +199,19 @@ impl StaticChannel { /// A static channel can only be split a single time. If /// [`StaticChannel::split`] or [`StaticChannel::try_split`] have been /// called previously, this method returns `None`. - pub fn try_split(&'static self) -> Option<(StaticSender, StaticReceiver)> { + pub fn try_split(&'static self) -> Option<(StaticSender, StaticReceiver)> { self.is_split .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .ok()?; let tx = StaticSender { core: &self.core, slots: &self.slots[..], + recycle: &self.recycle, }; let rx = StaticReceiver { core: &self.core, slots: &self.slots[..], + recycle: &self.recycle, }; Some((tx, rx)) } @@ -196,20 +219,29 @@ impl StaticChannel { // === impl Sender === -impl Sender { +impl Sender +where + R: Recycle, +{ pub fn try_send_ref(&self) -> Result, TrySendError> { self.inner .core - .try_send_ref(self.inner.slots.as_ref()) + .try_send_ref(self.inner.slots.as_ref(), &self.inner.recycle) .map(SendRef) } pub fn try_send(&self, val: T) -> Result<(), TrySendError> { - self.inner.core.try_send(self.inner.slots.as_ref(), val) + self.inner + .core + .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle) } pub fn send_ref(&self) -> Result, Closed> { - send_ref(&self.inner.core, self.inner.slots.as_ref()) + send_ref( + &self.inner.core, + self.inner.slots.as_ref(), + &self.inner.recycle, + ) } pub fn send(&self, val: T) -> Result<(), Closed> { @@ -223,7 +255,7 @@ impl Sender { } } -impl Clone for Sender { +impl Clone for Sender { fn clone(&self) -> Self { test_dbg!(self.inner.core.tx_count.fetch_add(1, Ordering::Relaxed)); Self { @@ -232,7 +264,7 @@ impl Clone for Sender { } } -impl Drop for Sender { +impl Drop for Sender { fn drop(&mut self) { if test_dbg!(self.inner.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 { return; @@ -248,14 +280,17 @@ impl Drop for Sender { // === impl Receiver === -impl Receiver { +impl Receiver { pub fn recv_ref(&self) -> Option> { recv_ref(&self.inner.core, self.inner.slots.as_ref()) } - pub fn recv(&self) -> Option { - let val = self.recv_ref()?.with_mut(core::mem::take); - Some(val) + pub fn recv(&self) -> Option + where + R: Recycle, + { + let mut val = self.recv_ref()?; + Some(recycling::take(&mut *val, &self.inner.recycle)) } pub fn is_closed(&self) -> bool { @@ -263,7 +298,7 @@ impl Receiver { } } -impl<'a, T: Default> Iterator for &'a Receiver { +impl<'a, T, R> Iterator for &'a Receiver { type Item = RecvRef<'a, T>; fn next(&mut self) -> Option { @@ -271,7 +306,7 @@ impl<'a, T: Default> Iterator for &'a Receiver { } } -impl Drop for Receiver { +impl Drop for Receiver { fn drop(&mut self) { self.inner.core.close_rx(); } @@ -279,17 +314,22 @@ impl Drop for Receiver { // === impl StaticSender === -impl StaticSender { +impl StaticSender +where + R: Recycle, +{ pub fn try_send_ref(&self) -> Result, TrySendError> { - self.core.try_send_ref(self.slots).map(SendRef) + self.core + .try_send_ref(self.slots, self.recycle) + .map(SendRef) } pub fn try_send(&self, val: T) -> Result<(), TrySendError> { - self.core.try_send(self.slots, val) + self.core.try_send(self.slots, val, self.recycle) } pub fn send_ref(&self) -> Result, Closed> { - send_ref(self.core, self.slots) + send_ref(self.core, self.slots, self.recycle) } pub fn send(&self, val: T) -> Result<(), Closed> { @@ -303,17 +343,18 @@ impl StaticSender { } } -impl Clone for StaticSender { +impl Clone for StaticSender { fn clone(&self) -> Self { test_dbg!(self.core.tx_count.fetch_add(1, Ordering::Relaxed)); Self { core: self.core, slots: self.slots, + recycle: self.recycle, } } } -impl Drop for StaticSender { +impl Drop for StaticSender { fn drop(&mut self) { if test_dbg!(self.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 { return; @@ -327,25 +368,29 @@ impl Drop for StaticSender { } } -impl fmt::Debug for StaticReceiver { +impl fmt::Debug for StaticSender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("StaticReceiver") + f.debug_struct("StaticSender") .field("core", &self.core) .field("slots", &format_args!("&[..]")) + .field("recycle", &self.recycle) .finish() } } -// === impl Receiver === +// === impl StaticReceiver === -impl StaticReceiver { +impl StaticReceiver { pub fn recv_ref(&self) -> Option> { recv_ref(self.core, self.slots) } - pub fn recv(&self) -> Option { - let val = self.recv_ref()?.with_mut(core::mem::take); - Some(val) + pub fn recv(&self) -> Option + where + R: Recycle, + { + let mut val = self.recv_ref()?; + Some(recycling::take(&mut *val, self.recycle)) } pub fn is_closed(&self) -> bool { @@ -353,7 +398,7 @@ impl StaticReceiver { } } -impl<'a, T: Default> Iterator for &'a StaticReceiver { +impl<'a, T, R> Iterator for &'a StaticReceiver { type Item = RecvRef<'a, T>; fn next(&mut self) -> Option { @@ -361,43 +406,42 @@ impl<'a, T: Default> Iterator for &'a StaticReceiver { } } -impl Drop for StaticReceiver { +impl Drop for StaticReceiver { fn drop(&mut self) { self.core.close_rx(); } } -impl fmt::Debug for StaticSender { +impl fmt::Debug for StaticReceiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("StaticSender") + f.debug_struct("StaticReceiver") .field("core", &self.core) .field("slots", &format_args!("&[..]")) + .field("recycle", &self.recycle) .finish() } } // === impl Inner === -impl fmt::Debug for Inner { +impl fmt::Debug for Inner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Inner") .field("core", &self.core) .field("slots", &format_args!("Box<[..]>")) + .field("recycle", &self.recycle) .finish() } } -impl Drop for Inner { +impl Drop for Inner { fn drop(&mut self) { self.core.core.drop_slots(&mut self.slots[..]) } } #[inline] -fn recv_ref<'a, T: Default>( - core: &'a ChannelCore, - slots: &'a [Slot], -) -> Option> { +fn recv_ref<'a, T>(core: &'a ChannelCore, slots: &'a [Slot]) -> Option> { loop { match core.poll_recv_ref(slots, thread::current) { Poll::Ready(r) => { @@ -415,13 +459,14 @@ fn recv_ref<'a, T: Default>( } #[inline] -fn send_ref<'a, T: Default>( +fn send_ref<'a, T, R: Recycle>( core: &'a ChannelCore, slots: &'a [Slot], + recycle: &'a R, ) -> Result, Closed<()>> { // fast path: avoid getting the thread and constructing the node if the // slot is immediately ready. - match core.try_send_ref(slots) { + match core.try_send_ref(slots, recycle) { Ok(slot) => return Ok(SendRef(slot)), Err(TrySendError::Closed(_)) => return Err(Closed(())), _ => {} @@ -449,7 +494,7 @@ fn send_ref<'a, T: Default>( WaitResult::Closed => return Err(Closed(())), WaitResult::Notified => { boff.spin_yield(); - match core.try_send_ref(slots.as_ref()) { + match core.try_send_ref(slots.as_ref(), recycle) { Ok(slot) => return Ok(SendRef(slot)), Err(TrySendError::Closed(_)) => return Err(Closed(())), _ => {} diff --git a/src/recycle.rs b/src/recycle.rs deleted file mode 100644 index abedc75..0000000 --- a/src/recycle.rs +++ /dev/null @@ -1,25 +0,0 @@ -pub trait Recycle { - /// Returns a new instance of type `T`. - fn new_element(&self) -> T; - - /// Resets `element` in place. - /// - /// Typically, this retains any previous allocations. - fn recycle(&self, element: &mut T); -} - -#[derive(Debug, Default)] -pub struct DefaultRecycle(()); - -impl Recycle for DefaultRecycle -where - T: Default + Clone, -{ - fn new_element(&self) -> T { - T::default() - } - - fn recycle(&self, element: &mut T) { - element.clone_from(&T::default()) - } -} diff --git a/src/recycling.rs b/src/recycling.rs new file mode 100644 index 0000000..a38ec80 --- /dev/null +++ b/src/recycling.rs @@ -0,0 +1,463 @@ +//! Configurable policies for element reuse. + +pub trait Recycle { + /// Returns a new instance of type `T`. + fn new_element(&self) -> T; + + /// Resets `element` in place. + /// + /// Typically, this retains any previous allocations. + fn recycle(&self, element: &mut T); +} + +/// A [`Recycle`] implementation for any type implementing [`Default`] and +/// [`Clone`]. +/// +/// This [creates new elements] by calling using [`Default::default()`]. +/// Existing elements are [recycled] by calling [`Clone::clone_from`] with the +/// default value. +/// +/// # Allocation Reuse +/// +/// [`Clone::clone_from`] is not *guaranteed* to reuse existing +/// allocations in place. For a number of common types in the standard library, +/// such as [`Box`], [`String`], [`Vec`], and collections based on [`Vec`] (such +/// as [`VecDeque`] and [`BinaryHeap`]), `clone_from` is overridden to reuse +/// existing allocations in place. However, other types may not override +/// `clone_from` in this way. +/// +/// `DefaultRecycle` will always *work* for types that implement [`Default`] and +/// [`Clone`], but it cannot be guaranteed to always reuse allocations. For a +/// more restrictive [`Recycle`] implementation that _will_ always reuse +/// existing allocations, consider [`WithCapacity`]. +/// +/// [creates new elements]: DefaultRecycle::new_element +/// [recycled]: DefaultRecycle::recycle +#[derive(Clone, Debug, Default)] +pub struct DefaultRecycle(()); + +/// A [`Recycle`] implementation for types that provide `with_capacity`, +/// `clear`, and `shrink_to` methods. +/// +/// This includes all array-based collections in the Rust standard library, such +/// as [`Vec`], [`String`], [`VecDeque`], and [`BinaryHeap`], as well as +/// [`HashMap`] and [`HashSet`]. +/// +/// # Usage +/// +/// By default, this type will always [recycle] elements by clearing all values +/// in place, returning all allocated capacity. [New elements] are allocated +/// with capacity for 0 values; they will allocate when first used. +/// +/// # Implementations for Other Types +/// +/// [`Recycle`] implementations may be added for similar data structures +/// implemented in other libraries. The [`min_capacity`] and +/// [`max_capacity`] methods expose the configured initial capacity and upper +/// bound. +/// +/// As an example, a library that implements an array-based data structure with +/// `with_capacity`, `clear`, and `shrink_to` methods can implement [`Recycle`] +/// for `WithCapacity` like so: +/// +/// ``` +/// use thingbuf::recycling::{self, Recycle}; +/// # use std::marker::PhantomData; +/// +/// /// Some kind of exciting new heap-allocated collection. +/// pub struct MyCollection { +/// // ... +/// # _p: PhantomData, +/// } +/// +/// impl MyCollection { +/// /// Returns a new `MyCollection` with enough capacity to hold +/// /// `capacity` elements without reallocationg. +/// pub fn with_capacity(capacity: usize) -> Self { +/// // ... +/// # unimplemented!() +/// } +/// +/// /// Returns the current allocated capacity of this `MyCollection`. +/// pub fn capacity(&self) -> usize { +/// // ... +/// # unimplemented!() +/// } +/// +/// /// Shrinks the capacity of the `MyCollection` with a lower bound. +/// /// +/// /// The capacity will remain at least as large as both the length +/// /// and the supplied value. +/// /// +/// /// If the current capacity is less than the lower limit, this is a no-op. +/// pub fn shrink_to(&mut self, min_capacity: usize) { +/// if self.capacity() > min_capacity { +/// // ... +/// # unimplemented!() +/// } +/// } +/// +/// /// Clears the `MyCollection`, removing all values. +/// /// +/// /// This does not change the current allocated capacity. The +/// /// `MyCollection` will still have enough allocated storage to hold +/// /// at least the current number of values. +/// pub fn clear(&mut self) { +/// // ... +/// # unimplemented!() +/// } +/// +/// // Other cool and exciting methods go here! +/// } +/// +/// // Because `MyCollection` has `with_capacity`, `shrink_to`, and `clear` methods, +/// // we can implement `Recycle>` for `WithCapacity` exactly the same +/// // way as it is implemented for standard library collections. +/// impl Recycle> for recycling::WithCapacity { +/// fn new_element(&self) -> MyCollection { +/// // Allocate a new element with the minimum initial capacity: +/// MyCollection::with_capacity(self.min_capacity()) +/// } +/// +/// fn recycle(&self, element: &mut MyCollection) { +/// // Recycle the element by clearing it in place, and then limiting the +/// // allocated capacity to the upper bound, if one is set: +/// element.clear(); +/// element.shrink_to(self.max_capacity()); +/// } +/// } +/// ``` +/// +/// # Allocation Reuse +/// +/// When an upper bound is not set, this recycling policy will _always_ reuse +/// any allocated capacity when recycling an element. Over time, the number of +/// reallocations required to grow items in a pool should decrease, amortizing +/// reallocations over the lifetime of the program. +/// +/// Of course, this means that it is technically possible for the allocated +/// capacity of the pool to grow infinitely, which can cause a memory leak if +/// used incorrectly. Therefore, it is also possible to set an upper bound on +/// idle capacity, using [`with_max_capacity`]. When such a bound is set, +/// recycled elements will be shrunk down to that capacity if they have grown +/// past the upper bound while in use. If this is the case, reallocations may +/// occur more often, but if the upper bound is higher than the typical required +/// capacity, they should remain infrequent. +/// +/// If elements will not require allocations of differing sizes, and the size is +/// known in advance (e.g. a pool of `HashMap`s that always have exactly 64 +/// elements), the [`with_max_capacity`] and [`with_min_capacity`] methods can +/// be called with the same value. This way, elements will always be initially +/// allocated with *exactly* that much capacity, and will only be shrunk if they +/// ever exceed that capacity. If the elements never grow beyond the specified +/// capacity, this should mean that no additional allocations will ever occur +/// once the initial pool of elements are allocated. +/// +/// [recycle]: Recycle::recycle +/// [`max_capacity`]: Self::max_capacity +/// [`min_capacity`]: Self::min_capacity +/// [`with_max_capacity`]: Self::with_max_capacity +/// [`with_min_capacity`]: Self::with_min_capacity +#[derive(Clone, Debug)] +pub struct WithCapacity { + min: usize, + max: usize, +} + +// TODO(eliza): consider making this public? +// TODO(eliza): consider making this a trait method with a default impl? +#[inline(always)] +pub(crate) fn take(element: &mut T, recycle: &R) -> T +where + R: Recycle, +{ + core::mem::replace(element, recycle.new_element()) +} + +impl DefaultRecycle { + pub const fn new() -> Self { + Self(()) + } +} + +impl Recycle for DefaultRecycle +where + T: Default + Clone, +{ + fn new_element(&self) -> T { + T::default() + } + + fn recycle(&self, element: &mut T) { + element.clone_from(&T::default()) + } +} + +// === impl WithCapacity === + +impl WithCapacity { + /// Returns a new [`WithCapacity`]. + /// + /// By default, the maximum capacity is unconstrained, and the minimum + /// capacity is 0. Existing allocations will always be reused, regardless + /// of size, and new elements will be created with 0 capacity. + /// + /// To add an upper bound on re-used capacity, use + /// [`WithCapacity::with_max_capacity`]. To allocate elements with an + /// initial capacity, use [`WithCapacity::with_min_capacity`]. + pub const fn new() -> Self { + Self { + max: core::usize::MAX, + min: 0, + } + } + + /// Sets an upper bound on the capacity that will be reused when [recycling] + /// elements. + /// + /// When an element is recycled, if its capacity exceeds the max value, it + /// will be shrunk down to that capacity. This will result in a + /// reallocation, but limits the total capacity allocated by the pool, + /// preventing unbounded memory use. + /// + /// Elements may still exceed the configured max capacity *while they are in + /// use*; this value only configures what happens when they are returned to + /// the pool. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// // Create a recycler with max capacity of 8. + /// let recycle = WithCapacity::new().with_max_capacity(8); + /// + /// // Create a new string using that recycler. + /// let mut s: String = recycle.new_element(); + /// assert_eq!(s.capacity(), 0); + /// + /// // Now, write some data to the string. + /// s.push_str("hello, world"); + /// + /// // The string's capacity must be at least the length of the + /// // string 'hello, world'. + /// assert!(s.capacity() >= "hello, world".len()); + /// + /// // After recycling the string, its capacity will be shrunk down + /// // to the configured max capacity. + /// recycle.recycle(&mut s); + /// assert_eq!(s.capacity(), 8); + /// ``` + /// + /// [recycling]: Recycle::recycle + pub const fn with_max_capacity(self, max: usize) -> Self { + Self { max, ..self } + } + + /// Sets the minimum capacity when [allocating new elements][new]. + /// + /// When new elements are created, they will be allocated with at least + /// `min` capacity. + /// + /// Note that this is a *lower bound*. Elements may be allocated with + /// greater than the minimum capacity, depending on the behavior of the + /// element being allocated, but there will always be *at least* `min` + /// capacity. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// // A recycler without a minimum capacity. + /// let no_min = WithCapacity::new(); + /// + /// // A new element created by this recycler will not + /// // allocate any capacity until it is used. + /// let s: String = no_min.new_element(); + /// assert_eq!(s.capacity(), 0); + /// + /// // Now, configure a minimum capacity. + /// let with_min = WithCapacity::new().with_min_capacity(8); + /// + /// // New elements created by this recycler will always be allocated + /// // with at least the specified capacity. + /// let s: String = with_min.new_element(); + /// assert!(s.capacity() >= 8); + /// ``` + /// + /// [new]: Recycle::new_element + pub const fn with_min_capacity(self, min: usize) -> Self { + Self { min, ..self } + } + + /// Returns the minimum initial capacity when [allocating new + /// elements][new]. + /// + /// This method can be used to implement `Recycle for WithCapacity` where + /// `T` is a type defined outside of this crate. See [the `WithCapacity` + /// documentation][impling] for details. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// let recycle = WithCapacity::new(); + /// assert_eq!(recycle.min_capacity(), 0); + /// ``` + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// let recycle = WithCapacity::new().with_min_capacity(64); + /// assert_eq!(recycle.min_capacity(), 64); + /// ``` + /// + /// [new]: Recycle::new_element + /// [impling]: WithCapacity#implementations-for-other-types + pub fn min_capacity(&self) -> usize { + self.min + } + + /// Returns the maximum retained capacity when [recycling + /// elements][recycle]. + /// + /// If no upper bound is configured, this will return [`usize::MAX`]. + /// + /// This method can be used to implement `Recycle for WithCapacity` where + /// `T` is a type defined outside of this crate. See [the `WithCapacity` + /// documentation][impling] for details. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// let recycle = WithCapacity::new(); + /// assert_eq!(recycle.max_capacity(), usize::MAX); + /// ``` + /// + /// ``` + /// use thingbuf::recycling::{Recycle, WithCapacity}; + /// + /// let recycle = WithCapacity::new().with_max_capacity(64); + /// assert_eq!(recycle.max_capacity(), 64); + /// ``` + /// + /// [recycle]: Recycle::recycle + /// [impling]: WithCapacity#implementations-for-other-types + pub fn max_capacity(&self) -> usize { + self.max + } +} + +impl Default for WithCapacity { + fn default() -> Self { + Self::new() + } +} + +feature! { + #![feature = "alloc"] + use alloc::{ + collections::{VecDeque, BinaryHeap}, + string::String, + sync::Arc, + vec::Vec, + }; + + impl Recycle for Arc + where + R: Recycle, + { + #[inline] + fn new_element(&self) -> T { + (**self).new_element() + } + + #[inline] + fn recycle(&self, element: &mut T) { + (**self).recycle(element) + } + } + + impl Recycle> for WithCapacity { + fn new_element(&self) -> Vec { + Vec::with_capacity(self.min) + } + + fn recycle(&self, element: &mut Vec) { + element.clear(); + element.shrink_to(self.max); + } + } + + impl Recycle for WithCapacity { + fn new_element(&self) -> String { + String::with_capacity(self.min) + } + + fn recycle(&self, element: &mut String) { + element.clear(); + element.shrink_to(self.max); + } + } + + impl Recycle> for WithCapacity { + fn new_element(&self) -> VecDeque { + VecDeque::with_capacity(self.min) + } + + fn recycle(&self, element: &mut VecDeque) { + element.clear(); + element.shrink_to(self.max); + } + } + + impl Recycle> for WithCapacity { + fn new_element(&self) -> BinaryHeap { + BinaryHeap::with_capacity(self.min) + } + + fn recycle(&self, element: &mut BinaryHeap) { + element.clear(); + element.shrink_to(self.max); + } + } +} + +feature! { + #![feature = "std"] + use std::{hash::{Hash, BuildHasher}, collections::{HashMap, HashSet}}; + + impl Recycle> for WithCapacity + where + K: Hash + Eq, + S: BuildHasher + Default + { + fn new_element(&self) -> HashMap { + HashMap::with_capacity_and_hasher(self.min, Default::default()) + } + + fn recycle(&self, element: &mut HashMap) { + element.clear(); + element.shrink_to(self.max); + } + } + + impl Recycle> for WithCapacity + where + K: Hash + Eq, + S: BuildHasher + Default + { + fn new_element(&self) -> HashSet { + HashSet::with_capacity_and_hasher(self.min, Default::default()) + } + + fn recycle(&self, element: &mut HashSet) { + element.clear(); + element.shrink_to(self.max); + } + } +} diff --git a/src/static_thingbuf.rs b/src/static_thingbuf.rs index ae0a6de..b424eb9 100644 --- a/src/static_thingbuf.rs +++ b/src/static_thingbuf.rs @@ -1,5 +1,8 @@ -use crate::{Core, Full, Ref, Slot}; -use core::{fmt, mem}; +use crate::{ + recycling::{self, Recycle}, + Core, Full, Ref, Slot, +}; +use core::fmt; /// A statically allocated, fixed-size lock-free multi-producer multi-consumer /// queue. @@ -188,8 +191,9 @@ use core::{fmt, mem}; /// [`ThingBuf`]: crate::ThingBuf /// [vyukov]: https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue /// [object pool]: https://en.wikipedia.org/wiki/Object_pool_pattern -pub struct StaticThingBuf { +pub struct StaticThingBuf { core: Core, + recycle: R, slots: [Slot; CAP], } @@ -199,14 +203,21 @@ pub struct StaticThingBuf { impl StaticThingBuf { /// Returns a new `StaticThingBuf` with space for `capacity` elements. pub const fn new() -> Self { - Self { + Self::with_recycle(recycling::DefaultRecycle::new()) + } +} + +impl StaticThingBuf { + pub const fn with_recycle(recycle: R) -> Self { + StaticThingBuf { core: Core::new(CAP), + recycle, slots: Slot::make_static_array::(), } } } -impl StaticThingBuf { +impl StaticThingBuf { /// Returns the *total* capacity of this queue. This includes both /// occupied and unoccupied entries. /// @@ -302,7 +313,10 @@ impl StaticThingBuf { } } -impl StaticThingBuf { +impl StaticThingBuf +where + R: Recycle, +{ /// Reserves a slot to push an element into the queue, returning a [`Ref`] that /// can be used to write to that slot. /// @@ -343,7 +357,7 @@ impl StaticThingBuf { /// /// static MESSAGES: StaticThingBuf = StaticThingBuf::new(); /// - /// #[derive(Default)] + /// #[derive(Clone, Default)] /// struct Message { /// // ... /// } @@ -380,10 +394,12 @@ impl StaticThingBuf { /// [`pop_ref`]: Self::pop_ref /// [`push`]: Self::push_ref pub fn push_ref(&self) -> Result, Full> { - self.core.push_ref(&self.slots).map_err(|e| match e { - crate::mpsc::TrySendError::Full(()) => Full(()), - _ => unreachable!(), - }) + self.core + .push_ref(&self.slots, &self.recycle) + .map_err(|e| match e { + crate::mpsc::TrySendError::Full(()) => Full(()), + _ => unreachable!(), + }) } /// Attempt to enqueue an element by value. @@ -468,7 +484,7 @@ impl StaticThingBuf { #[inline] pub fn pop(&self) -> Option { let mut slot = self.pop_ref()?; - Some(mem::take(&mut *slot)) + Some(recycling::take(&mut *slot, &self.recycle)) } /// Dequeue the first element in the queue by reference, and invoke the @@ -485,17 +501,18 @@ impl StaticThingBuf { } } -impl fmt::Debug for StaticThingBuf { +impl fmt::Debug for StaticThingBuf { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StaticThingBuf") .field("len", &self.len()) .field("slots", &format_args!("[...]")) .field("core", &self.core) + .field("recycle", &self.recycle) .finish() } } -impl Drop for StaticThingBuf { +impl Drop for StaticThingBuf { fn drop(&mut self) { self.core.drop_slots(&mut self.slots[..]); } diff --git a/src/stringbuf.rs b/src/stringbuf.rs deleted file mode 100644 index af7cc7f..0000000 --- a/src/stringbuf.rs +++ /dev/null @@ -1,92 +0,0 @@ -use super::*; - -use alloc::string::String; - -#[derive(Debug)] -pub struct StringBuf { - inner: ThingBuf, - max_idle_capacity: usize, -} - -impl StringBuf { - pub fn new(capacity: usize) -> Self { - Self { - inner: ThingBuf::new(capacity), - max_idle_capacity: usize::MAX, - } - } - - pub fn with_max_idle_capacity(self, max_idle_capacity: usize) -> Self { - Self { - max_idle_capacity, - inner: self.inner, - } - } - - #[inline] - pub fn capacity(&self) -> usize { - self.inner.capacity() - } - - #[inline] - pub fn write(&self) -> Result, Full> { - let mut string = self.inner.push_ref()?; - string.with_mut(String::clear); - Ok(string) - } - - pub fn pop_ref(&self) -> Option> { - let mut string = self.inner.pop_ref()?; - string.with_mut(|string| { - if string.capacity() > self.max_idle_capacity { - string.shrink_to_fit(); - } - }); - Some(string) - } -} - -#[derive(Debug)] -pub struct StaticStringBuf { - inner: StaticThingBuf, - max_idle_capacity: usize, -} - -impl StaticStringBuf { - #[cfg(not(test))] - pub const fn new() -> Self { - Self { - inner: StaticThingBuf::new(), - max_idle_capacity: usize::MAX, - } - } - - pub fn with_max_idle_capacity(self, max_idle_capacity: usize) -> Self { - Self { - max_idle_capacity, - inner: self.inner, - } - } - - #[inline] - pub fn capacity(&self) -> usize { - self.inner.capacity() - } - - #[inline] - pub fn write(&self) -> Result, Full> { - let mut string = self.inner.push_ref()?; - string.with_mut(String::clear); - Ok(string) - } - - pub fn pop_ref(&self) -> Option> { - let mut string = self.inner.pop_ref()?; - string.with_mut(|string| { - if string.capacity() > self.max_idle_capacity { - string.shrink_to_fit(); - } - }); - Some(string) - } -} diff --git a/src/thingbuf.rs b/src/thingbuf.rs index da02af2..e93b307 100644 --- a/src/thingbuf.rs +++ b/src/thingbuf.rs @@ -1,6 +1,9 @@ -use crate::{Core, Full, Ref, Slot}; +use crate::{ + recycling::{self, Recycle}, + Core, Full, Ref, Slot, +}; use alloc::boxed::Box; -use core::{fmt, mem}; +use core::fmt; #[cfg(all(loom, test))] mod tests; @@ -177,20 +180,128 @@ mod tests; /// [vyukov]: https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue /// [object pool]: https://en.wikipedia.org/wiki/Object_pool_pattern #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))] -pub struct ThingBuf { +pub struct ThingBuf { pub(crate) core: Core, pub(crate) slots: Box<[Slot]>, + recycle: R, } // === impl ThingBuf === -impl ThingBuf { +impl ThingBuf { /// Returns a new `ThingBuf` with space for `capacity` elements. pub fn new(capacity: usize) -> Self { + Self::with_recycle(capacity, recycling::DefaultRecycle::new()) + } +} + +impl ThingBuf { + /// Returns the *total* capacity of this queue. This includes both + /// occupied and unoccupied entries. + /// + /// To determine the queue's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::ThingBuf; + /// + /// let q = ThingBuf::::new(100); + /// assert_eq!(q.capacity(), 100); + /// ``` + /// + /// Even after pushing several messages to the queue, the capacity remains + /// the same: + /// ``` + /// # use thingbuf::ThingBuf; + /// + /// let q = ThingBuf::::new(100); + /// + /// *q.push_ref().unwrap() = 1; + /// *q.push_ref().unwrap() = 2; + /// *q.push_ref().unwrap() = 3; + /// + /// assert_eq!(q.capacity(), 100); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + pub fn capacity(&self) -> usize { + self.slots.len() + } + + /// Returns the unoccupied capacity of the queue (i.e., how many additional + /// elements can be enqueued before the queue will be full). + /// + /// This is equivalent to subtracting the queue's [`len`] from its [`capacity`]. + /// + /// [`len`]: Self::len + /// [`capacity`]: Self::capacity + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } + + /// Returns the number of elements in the queue + /// + /// To determine the queue's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::ThingBuf; + /// + /// let q = ThingBuf::new(100); + /// assert_eq!(q.len(), 0); + /// + /// *q.push_ref().unwrap() = 1; + /// *q.push_ref().unwrap() = 2; + /// *q.push_ref().unwrap() = 3; + /// assert_eq!(q.len(), 3); + /// + /// let _ = q.pop_ref(); + /// assert_eq!(q.len(), 2); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + pub fn len(&self) -> usize { + self.core.len() + } + + /// Returns `true` if there are currently no elements in this `ThingBuf`. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::ThingBuf; + /// + /// let q = ThingBuf::new(100); + /// assert!(q.is_empty()); + /// + /// *q.push_ref().unwrap() = 1; + /// assert!(!q.is_empty()); + /// + /// let _ = q.pop_ref(); + /// assert!(q.is_empty()); + /// ``` + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl ThingBuf +where + R: Recycle, +{ + pub fn with_recycle(capacity: usize, recycle: R) -> Self { assert!(capacity > 0); Self { core: Core::new(capacity), slots: Slot::make_boxed_array(capacity), + recycle, } } @@ -232,7 +343,7 @@ impl ThingBuf { /// ```rust /// use thingbuf::ThingBuf; /// - /// #[derive(Default)] + /// #[derive(Clone, Default)] /// struct Message { /// // ... /// } @@ -269,10 +380,12 @@ impl ThingBuf { /// [`pop_ref`]: Self::pop_ref /// [`push`]: Self::push_ref pub fn push_ref(&self) -> Result, Full> { - self.core.push_ref(&*self.slots).map_err(|e| match e { - crate::mpsc::TrySendError::Full(()) => Full(()), - _ => unreachable!(), - }) + self.core + .push_ref(&*self.slots, &self.recycle) + .map_err(|e| match e { + crate::mpsc::TrySendError::Full(()) => Full(()), + _ => unreachable!(), + }) } /// Attempt to enqueue an element by value. @@ -357,7 +470,7 @@ impl ThingBuf { #[inline] pub fn pop(&self) -> Option { let mut slot = self.pop_ref()?; - Some(mem::take(&mut *slot)) + Some(recycling::take(&mut *slot, &self.recycle)) } /// Dequeue the first element in the queue by reference, and invoke the @@ -374,115 +487,19 @@ impl ThingBuf { } } -impl ThingBuf { - /// Returns the *total* capacity of this queue. This includes both - /// occupied and unoccupied entries. - /// - /// To determine the queue's remaining *unoccupied* capacity, use - /// [`remaining`] instead. - /// - /// # Examples - /// - /// ``` - /// use thingbuf::ThingBuf; - /// - /// let q = ThingBuf::::new(100); - /// assert_eq!(q.capacity(), 100); - /// ``` - /// - /// Even after pushing several messages to the queue, the capacity remains - /// the same: - /// ``` - /// # use thingbuf::ThingBuf; - /// - /// let q = ThingBuf::::new(100); - /// - /// *q.push_ref().unwrap() = 1; - /// *q.push_ref().unwrap() = 2; - /// *q.push_ref().unwrap() = 3; - /// - /// assert_eq!(q.capacity(), 100); - /// ``` - /// - /// [`remaining`]: Self::remaining - #[inline] - pub fn capacity(&self) -> usize { - self.slots.len() - } - - /// Returns the unoccupied capacity of the queue (i.e., how many additional - /// elements can be enqueued before the queue will be full). - /// - /// This is equivalent to subtracting the queue's [`len`] from its [`capacity`]. - /// - /// [`len`]: Self::len - /// [`capacity`]: Self::capacity - pub fn remaining(&self) -> usize { - self.capacity() - self.len() - } - - /// Returns the number of elements in the queue - /// - /// To determine the queue's remaining *unoccupied* capacity, use - /// [`remaining`] instead. - /// - /// # Examples - /// - /// ``` - /// use thingbuf::ThingBuf; - /// - /// let q = ThingBuf::new(100); - /// assert_eq!(q.len(), 0); - /// - /// *q.push_ref().unwrap() = 1; - /// *q.push_ref().unwrap() = 2; - /// *q.push_ref().unwrap() = 3; - /// assert_eq!(q.len(), 3); - /// - /// let _ = q.pop_ref(); - /// assert_eq!(q.len(), 2); - /// ``` - /// - /// [`remaining`]: Self::remaining - #[inline] - pub fn len(&self) -> usize { - self.core.len() - } - - /// Returns `true` if there are currently no elements in this `ThingBuf`. - /// - /// # Examples - /// - /// ``` - /// use thingbuf::ThingBuf; - /// - /// let q = ThingBuf::new(100); - /// assert!(q.is_empty()); - /// - /// *q.push_ref().unwrap() = 1; - /// assert!(!q.is_empty()); - /// - /// let _ = q.pop_ref(); - /// assert!(q.is_empty()); - /// ``` - #[inline] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -impl Drop for ThingBuf { +impl Drop for ThingBuf { fn drop(&mut self) { self.core.drop_slots(&mut self.slots[..]); } } -impl fmt::Debug for ThingBuf { +impl fmt::Debug for ThingBuf { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ThingBuf") .field("len", &self.len()) .field("slots", &format_args!("[...]")) .field("core", &self.core) + .field("recycle", &self.recycle) .finish() } } diff --git a/tests/static_storage.rs b/tests/static_storage.rs index d55c3af..209e294 100644 --- a/tests/static_storage.rs +++ b/tests/static_storage.rs @@ -3,7 +3,7 @@ use std::{ sync::atomic::{AtomicBool, Ordering}, thread, }; -use thingbuf::{StaticStringBuf, StaticThingBuf}; +use thingbuf::{recycling, StaticThingBuf}; #[test] fn static_storage_thingbuf() { @@ -50,13 +50,16 @@ fn static_storage_thingbuf() { #[test] fn static_storage_stringbuf() { - static BUF: StaticStringBuf<8> = StaticStringBuf::new(); + use recycling::WithCapacity; + + static BUF: StaticThingBuf = + StaticThingBuf::with_recycle(WithCapacity::new().with_max_capacity(8)); static PRODUCER_LIVE: AtomicBool = AtomicBool::new(true); let producer = thread::spawn(move || { for i in 0..16 { let mut string = 'write: loop { - match BUF.write() { + match BUF.push_ref() { Ok(string) => break 'write string, _ => thread::yield_now(), }