Skip to content

Commit

Permalink
feat: Add queuing strategies for listener lists.
Browse files Browse the repository at this point in the history
Two strategies are available:
- FIFO: The original round-robin queuing; listeners are inserted at the back.
- LIFO: The new most-recent queuing; listeners are inserted at the front.

LIFO queuing is beneficial for cache-efficiency with workloads that are
tolerant of starvation. The same listener is repeatedly drawn from the list
until the load dictates additional listeners be drawn from the list. These
listeners expand outward as a "hot set" for optimal reuse of resources rather
than continuously drawing from the coldest resources in a FIFO schedule.

Signed-off-by: Jason Volk <[email protected]>
  • Loading branch information
jevolk committed Jan 2, 2025
1 parent afe606f commit fe4aebe
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 44 deletions.
95 changes: 58 additions & 37 deletions src/intrusive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::Ordering;
use crate::sync::cell::{Cell, UnsafeCell};
use crate::{RegisterResult, State, TaskRef};
use crate::{QueueStrategy, RegisterResult, State, TaskRef};

#[cfg(feature = "critical-section")]
use core::cell::RefCell;
Expand Down Expand Up @@ -42,17 +42,21 @@ struct Inner<T> {

/// The number of notified listeners.
notified: usize,

/// Strategy by which the list is organized.
strategy: QueueStrategy,
}

impl<T> List<T> {
/// Create a new, empty event listener list.
pub(super) fn new() -> Self {
pub(super) fn new(strategy: QueueStrategy) -> Self {
let inner = Inner {
head: None,
tail: None,
next: None,
len: 0,
notified: 0,
strategy,
};

#[cfg(feature = "critical-section")]
Expand Down Expand Up @@ -149,39 +153,9 @@ impl<T> crate::Inner<T> {
})
}

/// Add a new listener to the list.
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
self.with_inner(|inner| {
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
}

// If there are no unnotified entries, this is the first one.
if inner.next.is_none() {
inner.next = inner.tail;
}

// Bump the entry count.
inner.len += 1;
});
/// Adds a listener to the list.
pub(crate) fn insert(&self, listener: Pin<&mut Option<Listener<T>>>) {
self.with_inner(|inner| inner.insert(listener))
}

/// Remove a listener from the list.
Expand Down Expand Up @@ -248,6 +222,53 @@ impl<T> crate::Inner<T> {
}

impl<T> Inner<T> {
fn insert(&mut self, mut listener: Pin<&mut Option<Listener<T>>>) {
use QueueStrategy::{Fifo, Lifo};

listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(self.tail.filter(|_| self.strategy == Fifo)),
next: Cell::new(self.head.filter(|_| self.strategy == Lifo)),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Replace the head or tail with the new entry.
let replacing = match self.strategy {
Lifo => &mut self.head,
Fifo => &mut self.tail,
};

match mem::replace(replacing, Some(entry.into())) {
None => *replacing = Some(entry.into()),
Some(t) if self.strategy == Lifo => unsafe {
t.as_ref().prev.set(Some(entry.into()))
},
Some(t) if self.strategy == Fifo => unsafe {
t.as_ref().next.set(Some(entry.into()))
},
Some(_) => unimplemented!("unimplemented queue strategy"),
};
}

// If there are no unnotified entries, or if using LIFO strategy, this is the first one.
if self.strategy == Lifo {
self.next = self.head;
} else if self.next.is_none() {
self.next = self.tail;
}

// Bump the entry count.
self.len += 1;
}

fn remove(
&mut self,
mut listener: Pin<&mut Option<Listener<T>>>,
Expand Down Expand Up @@ -413,7 +434,7 @@ mod tests {

#[test]
fn insert() {
let inner = crate::Inner::new();
let inner = crate::Inner::new(QueueStrategy::Fifo);
make_listeners!(listen1, listen2, listen3);

// Register the listeners.
Expand All @@ -434,7 +455,7 @@ mod tests {

#[test]
fn drop_non_notified() {
let inner = crate::Inner::new();
let inner = crate::Inner::new(QueueStrategy::Fifo);
make_listeners!(listen1, listen2, listen3);

// Register the listeners.
Expand Down
48 changes: 44 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ use sync::WithMut;
use notify::NotificationPrivate;
pub use notify::{IntoNotification, Notification};

/// Queuing strategy for listeners.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum QueueStrategy {
/// First-in-first-out listeners are added to the back of the list.
Fifo,

/// Last-in-first-out listeners are added to the front of the list.
Lifo,
}

/// Inner state of [`Event`].
struct Inner<T> {
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
Expand All @@ -143,10 +153,10 @@ struct Inner<T> {
}

impl<T> Inner<T> {
fn new() -> Self {
fn new(queue_strategy: QueueStrategy) -> Self {
Self {
notified: AtomicUsize::new(usize::MAX),
list: sys::List::new(),
list: sys::List::new(queue_strategy),
}
}
}
Expand Down Expand Up @@ -177,6 +187,11 @@ pub struct Event<T = ()> {
/// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
/// reference count.
inner: AtomicPtr<Inner<T>>,

/// Queuing strategy.
///
/// Listeners waiting for notification will be arranged according to the strategy.
queue_strategy: QueueStrategy,
}

unsafe impl<T: Send> Send for Event<T> {}
Expand Down Expand Up @@ -238,13 +253,15 @@ impl<T> Event<T> {
pub const fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
queue_strategy: QueueStrategy::Fifo,
}
}
#[cfg(all(feature = "std", loom))]
#[inline]
pub fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
queue_strategy: QueueStrategy::Fifo,
}
}

Expand Down Expand Up @@ -471,7 +488,7 @@ impl<T> Event<T> {
// If this is the first use, initialize the state.
if inner.is_null() {
// Allocate the state on the heap.
let new = Arc::new(Inner::<T>::new());
let new = Arc::new(Inner::<T>::new(self.queue_strategy));

// Convert the state to a raw pointer.
let new = Arc::into_raw(new) as *mut Inner<T>;
Expand Down Expand Up @@ -556,16 +573,39 @@ impl Event<()> {
#[inline]
#[cfg(not(loom))]
pub const fn new() -> Self {
Self::new_with_queue_strategy(QueueStrategy::Fifo)
}

#[inline]
#[cfg(loom)]
pub fn new() -> Self {
Self::new_with_queue_strategy(QueueStrategy::Fifo)
}

/// Creates a new [`Event`] with specific queue strategy.
///
/// # Examples
///
/// ```
/// use event_listener::{Event, QueueStrategy};
///
/// let event = Event::new_with_queue_strategy(QueueStrategy::Fifo);
/// ```
#[inline]
#[cfg(not(loom))]
pub const fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
queue_strategy,
}
}

#[inline]
#[cfg(loom)]
pub fn new() -> Self {
pub fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
queue_strategy,
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/slab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::cell::{Cell, ConstPtr, UnsafeCell};
use crate::sync::Arc;
use crate::{RegisterResult, State, Task, TaskRef};
use crate::{QueueStrategy, RegisterResult, State, Task, TaskRef};

use core::fmt;
use core::marker::PhantomData;
Expand Down Expand Up @@ -229,7 +229,12 @@ pub(crate) struct List<T> {
}

impl<T> List<T> {
pub(super) fn new() -> List<T> {
pub(super) fn new(strategy: QueueStrategy) -> List<T> {
debug_assert!(
strategy == QueueStrategy::Fifo,
"Slab list only supports FIFO strategy"
);

List {
inner: Mutex::new(ListenerSlab::new()),
queue: concurrent_queue::ConcurrentQueue::unbounded(),
Expand Down Expand Up @@ -1362,7 +1367,7 @@ mod tests {

#[test]
fn uncontended_inner() {
let inner = crate::Inner::new();
let inner = crate::Inner::new(QueueStrategy::Fifo);

// Register two listeners.
let (mut listener1, mut listener2, mut listener3) = (None, None, None);
Expand Down

0 comments on commit fe4aebe

Please sign in to comment.