From fe4aebeeaae435af60087ddd56b573a2e0be671d Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 25 Dec 2024 05:28:10 +0000 Subject: [PATCH] feat: Add queuing strategies for listener lists. Two strategies are available: - FIFO: The original round-robin queuing; listeners are inserted at the back. - LIFO: The new most-recent queuing; listeners are inserted at the front. LIFO queuing is beneficial for cache-efficiency with workloads that are tolerant of starvation. The same listener is repeatedly drawn from the list until the load dictates additional listeners be drawn from the list. These listeners expand outward as a "hot set" for optimal reuse of resources rather than continuously drawing from the coldest resources in a FIFO schedule. Signed-off-by: Jason Volk --- src/intrusive.rs | 95 +++++++++++++++++++++++++++++------------------- src/lib.rs | 48 ++++++++++++++++++++++-- src/slab.rs | 11 ++++-- 3 files changed, 110 insertions(+), 44 deletions(-) diff --git a/src/intrusive.rs b/src/intrusive.rs index 69f460b..9d6fb45 100644 --- a/src/intrusive.rs +++ b/src/intrusive.rs @@ -6,7 +6,7 @@ use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::Ordering; use crate::sync::cell::{Cell, UnsafeCell}; -use crate::{RegisterResult, State, TaskRef}; +use crate::{QueueStrategy, RegisterResult, State, TaskRef}; #[cfg(feature = "critical-section")] use core::cell::RefCell; @@ -42,17 +42,21 @@ struct Inner { /// The number of notified listeners. notified: usize, + + /// Strategy by which the list is organized. + strategy: QueueStrategy, } impl List { /// Create a new, empty event listener list. - pub(super) fn new() -> Self { + pub(super) fn new(strategy: QueueStrategy) -> Self { let inner = Inner { head: None, tail: None, next: None, len: 0, notified: 0, + strategy, }; #[cfg(feature = "critical-section")] @@ -149,39 +153,9 @@ impl crate::Inner { }) } - /// Add a new listener to the list. - pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { - self.with_inner(|inner| { - listener.as_mut().set(Some(Listener { - link: UnsafeCell::new(Link { - state: Cell::new(State::Created), - prev: Cell::new(inner.tail), - next: Cell::new(None), - }), - _pin: PhantomPinned, - })); - let listener = listener.as_pin_mut().unwrap(); - - { - let entry_guard = listener.link.get(); - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { entry_guard.deref() }; - - // Replace the tail with the new entry. - match mem::replace(&mut inner.tail, Some(entry.into())) { - None => inner.head = Some(entry.into()), - Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, - }; - } - - // If there are no unnotified entries, this is the first one. - if inner.next.is_none() { - inner.next = inner.tail; - } - - // Bump the entry count. - inner.len += 1; - }); + /// Adds a listener to the list. + pub(crate) fn insert(&self, listener: Pin<&mut Option>>) { + self.with_inner(|inner| inner.insert(listener)) } /// Remove a listener from the list. @@ -248,6 +222,53 @@ impl crate::Inner { } impl Inner { + fn insert(&mut self, mut listener: Pin<&mut Option>>) { + use QueueStrategy::{Fifo, Lifo}; + + listener.as_mut().set(Some(Listener { + link: UnsafeCell::new(Link { + state: Cell::new(State::Created), + prev: Cell::new(self.tail.filter(|_| self.strategy == Fifo)), + next: Cell::new(self.head.filter(|_| self.strategy == Lifo)), + }), + _pin: PhantomPinned, + })); + let listener = listener.as_pin_mut().unwrap(); + + { + let entry_guard = listener.link.get(); + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; + + // Replace the head or tail with the new entry. + let replacing = match self.strategy { + Lifo => &mut self.head, + Fifo => &mut self.tail, + }; + + match mem::replace(replacing, Some(entry.into())) { + None => *replacing = Some(entry.into()), + Some(t) if self.strategy == Lifo => unsafe { + t.as_ref().prev.set(Some(entry.into())) + }, + Some(t) if self.strategy == Fifo => unsafe { + t.as_ref().next.set(Some(entry.into())) + }, + Some(_) => unimplemented!("unimplemented queue strategy"), + }; + } + + // If there are no unnotified entries, or if using LIFO strategy, this is the first one. + if self.strategy == Lifo { + self.next = self.head; + } else if self.next.is_none() { + self.next = self.tail; + } + + // Bump the entry count. + self.len += 1; + } + fn remove( &mut self, mut listener: Pin<&mut Option>>, @@ -413,7 +434,7 @@ mod tests { #[test] fn insert() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); make_listeners!(listen1, listen2, listen3); // Register the listeners. @@ -434,7 +455,7 @@ mod tests { #[test] fn drop_non_notified() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); make_listeners!(listen1, listen2, listen3); // Register the listeners. diff --git a/src/lib.rs b/src/lib.rs index d6a8e44..9bf5d7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -127,6 +127,16 @@ use sync::WithMut; use notify::NotificationPrivate; pub use notify::{IntoNotification, Notification}; +/// Queuing strategy for listeners. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum QueueStrategy { + /// First-in-first-out listeners are added to the back of the list. + Fifo, + + /// Last-in-first-out listeners are added to the front of the list. + Lifo, +} + /// Inner state of [`Event`]. struct Inner { /// The number of notified entries, or `usize::MAX` if all of them have been notified. @@ -143,10 +153,10 @@ struct Inner { } impl Inner { - fn new() -> Self { + fn new(queue_strategy: QueueStrategy) -> Self { Self { notified: AtomicUsize::new(usize::MAX), - list: sys::List::new(), + list: sys::List::new(queue_strategy), } } } @@ -177,6 +187,11 @@ pub struct Event { /// is an `Arc` so it's important to keep in mind that it contributes to the [`Arc`]'s /// reference count. inner: AtomicPtr>, + + /// Queuing strategy. + /// + /// Listeners waiting for notification will be arranged according to the strategy. + queue_strategy: QueueStrategy, } unsafe impl Send for Event {} @@ -238,6 +253,7 @@ impl Event { pub const fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy: QueueStrategy::Fifo, } } #[cfg(all(feature = "std", loom))] @@ -245,6 +261,7 @@ impl Event { pub fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy: QueueStrategy::Fifo, } } @@ -471,7 +488,7 @@ impl Event { // If this is the first use, initialize the state. if inner.is_null() { // Allocate the state on the heap. - let new = Arc::new(Inner::::new()); + let new = Arc::new(Inner::::new(self.queue_strategy)); // Convert the state to a raw pointer. let new = Arc::into_raw(new) as *mut Inner; @@ -556,16 +573,39 @@ impl Event<()> { #[inline] #[cfg(not(loom))] pub const fn new() -> Self { + Self::new_with_queue_strategy(QueueStrategy::Fifo) + } + + #[inline] + #[cfg(loom)] + pub fn new() -> Self { + Self::new_with_queue_strategy(QueueStrategy::Fifo) + } + + /// Creates a new [`Event`] with specific queue strategy. + /// + /// # Examples + /// + /// ``` + /// use event_listener::{Event, QueueStrategy}; + /// + /// let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); + /// ``` + #[inline] + #[cfg(not(loom))] + pub const fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy, } } #[inline] #[cfg(loom)] - pub fn new() -> Self { + pub fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy, } } diff --git a/src/slab.rs b/src/slab.rs index 59e1c21..11e9a12 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -18,7 +18,7 @@ use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::cell::{Cell, ConstPtr, UnsafeCell}; use crate::sync::Arc; -use crate::{RegisterResult, State, Task, TaskRef}; +use crate::{QueueStrategy, RegisterResult, State, Task, TaskRef}; use core::fmt; use core::marker::PhantomData; @@ -229,7 +229,12 @@ pub(crate) struct List { } impl List { - pub(super) fn new() -> List { + pub(super) fn new(strategy: QueueStrategy) -> List { + debug_assert!( + strategy == QueueStrategy::Fifo, + "Slab list only supports FIFO strategy" + ); + List { inner: Mutex::new(ListenerSlab::new()), queue: concurrent_queue::ConcurrentQueue::unbounded(), @@ -1362,7 +1367,7 @@ mod tests { #[test] fn uncontended_inner() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); // Register two listeners. let (mut listener1, mut listener2, mut listener3) = (None, None, None);