Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements the changes for Wakers that are currently discussed in the tracking issue for stabilization #1340

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions futures-core/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ mod spawn;
pub mod __internal;
pub use self::spawn::{Spawn, LocalSpawn, SpawnError};

pub use core::task::{Poll, Waker, LocalWaker, UnsafeWake};
pub use core::task::{Poll, Waker, LocalWaker, RawWaker, RawWakerVTable};
#[cfg(feature = "std")]
pub use std::task::{Wake, local_waker, local_waker_from_nonlocal};
pub use std::task::{ArcWake};
8 changes: 4 additions & 4 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use crate::{enter, ThreadPool};
use futures_core::future::{Future, FutureObj, LocalFutureObj};
use futures_core::stream::{Stream};
use futures_core::task::{
self, Poll, LocalWaker, Wake,
Poll, LocalWaker, ArcWake,
Spawn, LocalSpawn, SpawnError,
};
use futures_util::task::{LocalWakerRef, local_waker_ref};
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -53,7 +54,7 @@ thread_local! {
});
}

impl Wake for ThreadNotify {
impl ArcWake for ThreadNotify {
fn wake(arc_self: &Arc<Self>) {
arc_self.thread.unpark();
}
Expand All @@ -67,8 +68,7 @@ fn run_executor<T, F: FnMut(&LocalWaker) -> Poll<T>>(mut f: F) -> T {
another executor");

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let local_waker =
task::local_waker_from_nonlocal(thread_notify.clone());
let local_waker: LocalWakerRef = unsafe { local_waker_ref(thread_notify) };
loop {
if let Poll::Ready(t) = f(&local_waker) {
return t;
Expand Down
4 changes: 2 additions & 2 deletions futures-executor/src/thread_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::enter;
use crate::unpark_mutex::UnparkMutex;
use futures_core::future::{Future, FutureObj};
use futures_core::task::{Poll, Wake, Spawn, SpawnError};
use futures_core::task::{Poll, ArcWake, Spawn, SpawnError};
use futures_util::future::FutureExt;
use futures_util::task::local_waker_ref_from_nonlocal;
use num_cpus;
Expand Down Expand Up @@ -338,7 +338,7 @@ impl fmt::Debug for Task {
}
}

impl Wake for WakeHandle {
impl ArcWake for WakeHandle {
fn wake(arc_self: &Arc<Self>) {
match arc_self.mutex.notify() {
Ok(task) => arc_self.exec.state.send(Message::Run(task)),
Expand Down
2 changes: 1 addition & 1 deletion futures-test/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod panic_spawner;
pub use self::panic_spawner::{panic_spawner_mut, PanicSpawner};

mod panic_waker;
pub use self::panic_waker::{panic_local_waker, panic_local_waker_ref, PanicWake};
pub use self::panic_waker::{panic_local_waker, panic_local_waker_ref};

mod record_spawner;
pub use self::record_spawner::RecordSpawner;
Expand Down
94 changes: 38 additions & 56 deletions futures-test/src/task/panic_waker.rs
Original file line number Diff line number Diff line change
@@ -1,74 +1,56 @@
use futures_core::task::{LocalWaker, UnsafeWake, Wake, Waker};
use std::cell::UnsafeCell;
use std::ptr::NonNull;
use std::sync::Arc;
use futures_core::task::{LocalWaker, RawWaker, RawWakerVTable};
use core::cell::UnsafeCell;
use core::ptr::null;

/// An implementation of [`Wake`](futures_core::task::Wake) that panics when
/// woken.
///
/// # Examples
///
/// ```should_panic
/// #![feature(futures_api)]
/// use futures_test::task::panic_local_waker_ref;
///
/// let lw = panic_local_waker_ref();
/// lw.wake(); // Will panic
/// ```
#[derive(Debug)]
pub struct PanicWake {
_reserved: (),
unsafe fn noop_clone(_data: *const()) -> RawWaker {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
unsafe fn noop_clone(_data: *const()) -> RawWaker {
fn noop_clone(_data: *const()) -> RawWaker {

ditto

raw_panic_waker()
}

impl PanicWake {
/// Create a new instance
pub fn new() -> Self {
Self { _reserved: () }
}
unsafe fn noop(_data: *const()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
unsafe fn noop(_data: *const()) {
fn noop(_data: *const()) {

You should be able to remove unsafe here because of this coercion between fn and unsafe fn:

fn noop(_data: *const()) {}
const PTR: unsafe fn(*const ()) = noop;

Copy link
Member

@Nemo157 Nemo157 Dec 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, so there’s a coercion from fn to unsafe fn and from environment-less-closure to fn, but not the combined environment-less-closure to unsafe fn: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=eaec9c9f080c999a55a08dd2184d8669. Has there been any discussion of adding the latter?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's just due to the lack of transitive coercions in general. It is not particular to this case AFAIK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We generally accept PRs for these without FCP since the original RFC specified that we should include all transitive coercions, but there isn't a mechanism in the compiler for that today.

}

impl Default for PanicWake {
fn default() -> Self {
Self::new()
}
unsafe fn wake_panic(_data: *const()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
unsafe fn wake_panic(_data: *const()) {
fn wake_panic(_data: *const()) {

ditto

panic!("should not be woken");
}

impl Wake for PanicWake {
fn wake(_arc_self: &Arc<Self>) {
panic!("should not be woken")
}
unsafe fn noop_into_waker(_data: *const()) -> Option<RawWaker> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
unsafe fn noop_into_waker(_data: *const()) -> Option<RawWaker> {
fn noop_into_waker(_data: *const()) -> Option<RawWaker> {

ditto

Some(raw_panic_waker())
}

unsafe impl UnsafeWake for PanicWake {
unsafe fn clone_raw(&self) -> Waker {
panic_waker()
}
const PANIC_WAKER_VTABLE: RawWakerVTable = RawWakerVTable {
clone: noop_clone,
drop_fn: noop,
wake: wake_panic,
into_waker: noop_into_waker,
};

unsafe fn drop_raw(&self) {}

unsafe fn wake(&self) {
panic!("should not be woken")
fn raw_panic_waker() -> RawWaker {
RawWaker {
data: null(),
vtable: &PANIC_WAKER_VTABLE,
}
}

fn panic_unsafe_wake() -> NonNull<dyn UnsafeWake> {
static mut INSTANCE: PanicWake = PanicWake { _reserved: () };
unsafe { NonNull::new_unchecked(&mut INSTANCE as *mut dyn UnsafeWake) }
}

fn panic_waker() -> Waker {
unsafe { Waker::new(panic_unsafe_wake()) }
}

/// Create a new [`LocalWaker`](futures_core::task::LocalWaker) referencing
/// a singleton instance of [`PanicWake`].
/// Create a new [`LocalWaker`](futures_core::task::LocalWaker) which will
/// panic when `wake()` is called on it. The [`LocalWaker`] can be converted
/// into a [`Waker`] which will behave the same way.
///
/// # Examples
///
/// ```should_panic
/// #![feature(futures_api)]
/// use futures_test::task::panic_local_waker;
///
/// let lw = panic_local_waker();
/// lw.wake(); // Will panic
/// ```
pub fn panic_local_waker() -> LocalWaker {
unsafe { LocalWaker::new(panic_unsafe_wake()) }
unsafe { LocalWaker::new_unchecked(raw_panic_waker()) }
}

/// Get a thread local reference to a
/// Get a global reference to a
/// [`LocalWaker`](futures_core::task::LocalWaker) referencing a singleton
/// instance of [`PanicWake`].
/// instance of a [`LocalWaker`] which panics when woken.
///
/// # Examples
///
Expand All @@ -82,8 +64,8 @@ pub fn panic_local_waker() -> LocalWaker {
/// ```
pub fn panic_local_waker_ref() -> &'static LocalWaker {
thread_local! {
static LOCAL_WAKER_INSTANCE: UnsafeCell<LocalWaker> =
static PANIC_WAKER_INSTANCE: UnsafeCell<LocalWaker> =
UnsafeCell::new(panic_local_waker());
}
LOCAL_WAKER_INSTANCE.with(|l| unsafe { &*l.get() })
PANIC_WAKER_INSTANCE.with(|l| unsafe { &*l.get() })
}
6 changes: 3 additions & 3 deletions futures-test/src/task/wake_counter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures_core::task::{self, LocalWaker, Wake};
use futures_core::task::{LocalWaker, ArcWake};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -39,7 +39,7 @@ impl WakeCounter {
count: AtomicUsize::new(0),
});
WakeCounter {
local_waker: task::local_waker_from_nonlocal(inner.clone()),
local_waker: ArcWake::into_local_waker(inner.clone()),
inner,
}
}
Expand All @@ -63,7 +63,7 @@ impl Default for WakeCounter {
}
}

impl Wake for Inner {
impl ArcWake for Inner {
fn wake(arc_self: &Arc<Self>) {
arc_self.count.fetch_add(1, Ordering::SeqCst);
}
Expand Down
10 changes: 6 additions & 4 deletions futures-util/src/future/shared.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::task::local_waker_ref_from_nonlocal;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{LocalWaker, Poll, Wake, Waker};
use futures_core::task::{LocalWaker, Poll, ArcWake, Waker};
use slab::Slab;
use std::cell::UnsafeCell;
use std::fmt;
Expand Down Expand Up @@ -130,10 +130,12 @@ where
self.waker_key = wakers.insert(Some(lw.clone().into_waker()));
} else {
let waker_slot = &mut wakers[self.waker_key];
let needs_replacement = if let Some(old_waker) = waker_slot {
let needs_replacement = if let Some(_old_waker) = waker_slot {
// If there's still an unwoken waker in the slot, only replace
// if the current one wouldn't wake the same task.
!lw.will_wake_nonlocal(old_waker)
// TODO: This API is currently not available, so replace always
// !lw.will_wake_nonlocal(old_waker)
true
} else {
true
};
Expand Down Expand Up @@ -315,7 +317,7 @@ where
}
}

impl Wake for Notifier {
impl ArcWake for Notifier {
fn wake(arc_self: &Arc<Self>) {
arc_self.state.compare_and_swap(POLLING, REPOLL, SeqCst);

Expand Down
Loading