From 72572ef1d5cb928d70340fcaed0d25dc4921222f Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 25 Jul 2024 11:40:23 +0100 Subject: [PATCH] replace AtomicWaker with DiatomicWaker, removing futures_util --- Cargo.toml | 2 +- benches/batch.rs | 6 +++--- src/arc_slice.rs | 16 ++++++++++------ tests/http.rs | 2 +- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4c39f18..4004f33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,8 @@ bench = false [dependencies] futures-core = "0.3.21" -futures-util = "0.3.21" pin-project-lite = "0.2" +diatomic-waker = "0.1.0" [dev-dependencies] futures = "0.3.21" diff --git a/benches/batch.rs b/benches/batch.rs index 474f85b..0066125 100644 --- a/benches/batch.rs +++ b/benches/batch.rs @@ -16,7 +16,7 @@ async fn sleep() { #[divan::bench_group] mod futures_unordered { use futures_buffered::FuturesUnorderedBounded; - use futures_util::{stream::FuturesUnordered, StreamExt}; + use futures::{stream::FuturesUnordered, StreamExt}; use crate::sleep; @@ -72,7 +72,7 @@ mod futures_unordered { #[divan::bench_group] mod buffer_unordered { use futures_buffered::BufferedStreamExt; - use futures_util::{stream, StreamExt}; + use futures::{stream, StreamExt}; use crate::sleep; @@ -108,7 +108,7 @@ mod buffer_unordered { #[divan::bench_group] mod buffer_ordered { use futures_buffered::BufferedStreamExt; - use futures_util::{stream, StreamExt}; + use futures::{stream, StreamExt}; use crate::sleep; diff --git a/src/arc_slice.rs b/src/arc_slice.rs index 9f69a9b..38aa3e0 100644 --- a/src/arc_slice.rs +++ b/src/arc_slice.rs @@ -7,7 +7,7 @@ use core::{ sync::atomic::{self, AtomicUsize}, task::Waker, }; -use futures_util::task::AtomicWaker; +use diatomic_waker::primitives::DiatomicWaker; /// [`ArcSlice`] is a fun optimisation. For `FuturesUnorderedBounded`, we have `n` slots for futures, /// and we create a separate context when polling each individual future to avoid having n^2 polling. @@ -50,7 +50,7 @@ pub(crate) struct ArcSliceInner { pub(crate) struct ArcSliceInnerMeta { strong: AtomicUsize, - waker: AtomicWaker, + waker: DiatomicWaker, list_head: AtomicUsize, list_tail: UnsafeCell, len: usize, @@ -91,8 +91,12 @@ impl Deref for ArcSlice { impl ArcSlice { /// Register the waker - pub(crate) fn register(&self, waker: &Waker) { - self.meta.waker.register(waker) + pub(crate) fn register(&mut self, waker: &Waker) { + // Safety: + // Diatomic waker requires we do not concurrently run + // "register", "unregister", and "wait_until". + // we only call register with mut access, thus we are safe. + unsafe { self.meta.waker.register(waker) } } fn get(&self, index: usize) -> Waker { @@ -289,7 +293,7 @@ mod slot { let slot = waker.cast(); let inner = inner_ref(slot); inner.push((*slot).index); - inner.meta.waker.wake(); + inner.meta.waker.notify(); } // Decrement the reference count of the Arc on drop @@ -433,7 +437,7 @@ impl ArcSlice { len: cap, list_head: AtomicUsize::new(cap), list_tail: UnsafeCell::new(cap), - waker: AtomicWaker::new(), + waker: DiatomicWaker::new(), }; ptr::write(ptr::addr_of_mut!((*inner).meta), meta); for i in 0..cap { diff --git a/tests/http.rs b/tests/http.rs index 222c355..2c63633 100644 --- a/tests/http.rs +++ b/tests/http.rs @@ -1,8 +1,8 @@ #![cfg(not(miri))] use std::time::Instant; +use futures::StreamExt; use futures_buffered::BufferedStreamExt; -use futures_util::StreamExt; use reqwest::{Client, Error}; static URLS: &[&str] = &[