Skip to content

Commit

Permalink
replace AtomicWaker with DiatomicWaker, removing futures_util
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Jul 25, 2024
1 parent 8a43672 commit 72572ef
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ bench = false

[dependencies]
futures-core = "0.3.21"
futures-util = "0.3.21"
pin-project-lite = "0.2"
diatomic-waker = "0.1.0"

[dev-dependencies]
futures = "0.3.21"
Expand Down
6 changes: 3 additions & 3 deletions benches/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn sleep() {
#[divan::bench_group]
mod futures_unordered {
use futures_buffered::FuturesUnorderedBounded;
use futures_util::{stream::FuturesUnordered, StreamExt};
use futures::{stream::FuturesUnordered, StreamExt};

use crate::sleep;

Expand Down Expand Up @@ -72,7 +72,7 @@ mod futures_unordered {
#[divan::bench_group]
mod buffer_unordered {
use futures_buffered::BufferedStreamExt;
use futures_util::{stream, StreamExt};
use futures::{stream, StreamExt};

use crate::sleep;

Expand Down Expand Up @@ -108,7 +108,7 @@ mod buffer_unordered {
#[divan::bench_group]
mod buffer_ordered {
use futures_buffered::BufferedStreamExt;
use futures_util::{stream, StreamExt};
use futures::{stream, StreamExt};

use crate::sleep;

Expand Down
16 changes: 10 additions & 6 deletions src/arc_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use core::{
sync::atomic::{self, AtomicUsize},
task::Waker,
};
use futures_util::task::AtomicWaker;
use diatomic_waker::primitives::DiatomicWaker;

/// [`ArcSlice`] is a fun optimisation. For `FuturesUnorderedBounded`, we have `n` slots for futures,
/// and we create a separate context when polling each individual future to avoid having n^2 polling.
Expand Down Expand Up @@ -50,7 +50,7 @@ pub(crate) struct ArcSliceInner {

pub(crate) struct ArcSliceInnerMeta {
strong: AtomicUsize,
waker: AtomicWaker,
waker: DiatomicWaker,
list_head: AtomicUsize,
list_tail: UnsafeCell<usize>,
len: usize,
Expand Down Expand Up @@ -91,8 +91,12 @@ impl Deref for ArcSlice {

impl ArcSlice {
/// Register the waker
pub(crate) fn register(&self, waker: &Waker) {
self.meta.waker.register(waker)
pub(crate) fn register(&mut self, waker: &Waker) {
// Safety:
// Diatomic waker requires we do not concurrently run
// "register", "unregister", and "wait_until".
// we only call register with mut access, thus we are safe.
unsafe { self.meta.waker.register(waker) }
}

fn get(&self, index: usize) -> Waker {
Expand Down Expand Up @@ -289,7 +293,7 @@ mod slot {
let slot = waker.cast();
let inner = inner_ref(slot);
inner.push((*slot).index);
inner.meta.waker.wake();
inner.meta.waker.notify();
}

// Decrement the reference count of the Arc on drop
Expand Down Expand Up @@ -433,7 +437,7 @@ impl ArcSlice {
len: cap,
list_head: AtomicUsize::new(cap),
list_tail: UnsafeCell::new(cap),
waker: AtomicWaker::new(),
waker: DiatomicWaker::new(),
};
ptr::write(ptr::addr_of_mut!((*inner).meta), meta);
for i in 0..cap {
Expand Down
2 changes: 1 addition & 1 deletion tests/http.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![cfg(not(miri))]
use std::time::Instant;

use futures::StreamExt;
use futures_buffered::BufferedStreamExt;
use futures_util::StreamExt;
use reqwest::{Client, Error};

static URLS: &[&str] = &[
Expand Down

0 comments on commit 72572ef

Please sign in to comment.