-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1507 from Pauan/futures-0.3
Adding in Futures 0.3 support to wasm-bindgen-futures
- Loading branch information
Showing
3 changed files
with
280 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,270 @@ | ||
use std::fmt; | ||
use std::pin::Pin; | ||
use std::cell::{Cell, RefCell}; | ||
use std::sync::Arc; | ||
use std::future::Future; | ||
use std::task::{Poll, Context}; | ||
use std::collections::VecDeque; | ||
|
||
use futures_util::task::ArcWake; | ||
use futures_util::future::FutureExt; | ||
use futures_channel::oneshot; | ||
|
||
use lazy_static::lazy_static; | ||
|
||
use js_sys::Promise; | ||
use wasm_bindgen::prelude::*; | ||
|
||
/// A Rust `Future` backed by a JavaScript `Promise`. | ||
/// | ||
/// This type is constructed with a JavaScript `Promise` object and translates | ||
/// it to a Rust `Future`. This type implements the `Future` trait from the | ||
/// `futures` crate and will either succeed or fail depending on what happens | ||
/// with the JavaScript `Promise`. | ||
/// | ||
/// Currently this type is constructed with `JsFuture::from`. | ||
pub struct JsFuture { | ||
resolved: oneshot::Receiver<JsValue>, | ||
rejected: oneshot::Receiver<JsValue>, | ||
_cb_resolve: Closure<FnMut(JsValue)>, | ||
_cb_reject: Closure<FnMut(JsValue)>, | ||
} | ||
|
||
impl fmt::Debug for JsFuture { | ||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||
write!(f, "JsFuture {{ ... }}") | ||
} | ||
} | ||
|
||
impl From<Promise> for JsFuture { | ||
fn from(js: Promise) -> JsFuture { | ||
// Use the `then` method to schedule two callbacks, one for the | ||
// resolved value and one for the rejected value. These two callbacks | ||
// will be connected to oneshot channels which feed back into our | ||
// future. | ||
// | ||
// This may not be the speediest option today but it should work! | ||
let (tx1, rx1) = oneshot::channel(); | ||
|
||
let cb_resolve = Closure::once(move |val| { | ||
tx1.send(val).unwrap_throw(); | ||
}); | ||
|
||
let (tx2, rx2) = oneshot::channel(); | ||
|
||
let cb_reject = Closure::once(move |val| { | ||
tx2.send(val).unwrap_throw(); | ||
}); | ||
|
||
js.then2(&cb_resolve, &cb_reject); | ||
|
||
JsFuture { | ||
resolved: rx1, | ||
rejected: rx2, | ||
_cb_resolve: cb_resolve, | ||
_cb_reject: cb_reject, | ||
} | ||
} | ||
} | ||
|
||
impl Future for JsFuture { | ||
type Output = Result<JsValue, JsValue>; | ||
|
||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { | ||
// Test if either our resolved or rejected side is finished yet. | ||
if let Poll::Ready(val) = self.resolved.poll_unpin(cx) { | ||
return Poll::Ready(Ok(val.unwrap_throw())); | ||
} | ||
|
||
if let Poll::Ready(val) = self.rejected.poll_unpin(cx) { | ||
return Poll::Ready(Err(val.unwrap_throw())); | ||
} | ||
|
||
Poll::Pending | ||
} | ||
} | ||
|
||
/// Converts a Rust `Future` into a JavaScript `Promise`. | ||
/// | ||
/// This function will take any future in Rust and schedule it to be executed, | ||
/// returning a JavaScript `Promise` which can then be passed back to JavaScript | ||
/// to get plumbed into the rest of a system. | ||
/// | ||
/// The `future` provided must adhere to `'static` because it'll be scheduled | ||
/// to run in the background and cannot contain any stack references. The | ||
/// returned `Promise` will be resolved or rejected when the future completes, | ||
/// depending on whether it finishes with `Ok` or `Err`. | ||
/// | ||
/// # Panics | ||
/// | ||
/// Note that in wasm panics are currently translated to aborts, but "abort" in | ||
/// this case means that a JavaScript exception is thrown. The wasm module is | ||
/// still usable (likely erroneously) after Rust panics. | ||
/// | ||
/// If the `future` provided panics then the returned `Promise` **will not | ||
/// resolve**. Instead it will be a leaked promise. This is an unfortunate | ||
/// limitation of wasm currently that's hoped to be fixed one day! | ||
pub fn future_to_promise<F>(future: F) -> Promise | ||
where | ||
F: Future<Output = Result<JsValue, JsValue>> + 'static, | ||
{ | ||
let mut future = Some(future); | ||
|
||
Promise::new(&mut |resolve, reject| { | ||
// TODO change Promise::new to be FnOnce | ||
spawn_local(future.take().unwrap_throw().map(move |val| { | ||
match val { | ||
Ok(val) => { | ||
resolve.call1(&JsValue::undefined(), &val).unwrap_throw(); | ||
}, | ||
Err(val) => { | ||
reject.call1(&JsValue::undefined(), &val).unwrap_throw(); | ||
}, | ||
} | ||
})); | ||
}) | ||
} | ||
|
||
/// Runs a Rust `Future` on a local task queue. | ||
/// | ||
/// The `future` provided must adhere to `'static` because it'll be scheduled | ||
/// to run in the background and cannot contain any stack references. | ||
/// | ||
/// # Panics | ||
/// | ||
/// This function has the same panic behavior as `future_to_promise`. | ||
pub fn spawn_local<F>(future: F) | ||
where | ||
F: Future<Output = ()> + 'static, | ||
{ | ||
struct Task { | ||
// This is an Option so that the Future can be immediately dropped when it's finished | ||
future: RefCell<Option<Pin<Box<dyn Future<Output = ()> + 'static>>>>, | ||
|
||
// This is used to ensure that the Task will only be queued once | ||
is_queued: Cell<bool>, | ||
} | ||
|
||
impl Task { | ||
#[inline] | ||
fn new<F>(future: F) -> Arc<Self> where F: Future<Output = ()> + 'static { | ||
Arc::new(Self { | ||
future: RefCell::new(Some(Box::pin(future))), | ||
is_queued: Cell::new(false), | ||
}) | ||
} | ||
} | ||
|
||
impl ArcWake for Task { | ||
fn wake_by_ref(arc_self: &Arc<Self>) { | ||
// This ensures that it's only queued once | ||
if arc_self.is_queued.replace(true) { | ||
return; | ||
} | ||
|
||
let mut lock = EXECUTOR.tasks.borrow_mut(); | ||
|
||
lock.push_back(arc_self.clone()); | ||
|
||
// The Task will be polled on the next microtask event tick | ||
EXECUTOR.next_tick.schedule(); | ||
} | ||
} | ||
|
||
|
||
struct NextTick { | ||
is_spinning: Cell<bool>, | ||
promise: Promise, | ||
closure: Closure<dyn FnMut(JsValue)>, | ||
} | ||
|
||
impl NextTick { | ||
#[inline] | ||
fn new<F>(mut f: F) -> Self where F: FnMut() + 'static { | ||
Self { | ||
is_spinning: Cell::new(false), | ||
promise: Promise::resolve(&JsValue::null()), | ||
closure: Closure::wrap(Box::new(move |_| { | ||
f(); | ||
})), | ||
} | ||
} | ||
|
||
fn schedule(&self) { | ||
// This ensures that it's only scheduled once | ||
if self.is_spinning.replace(true) { | ||
return; | ||
} | ||
|
||
// TODO avoid creating a new Promise | ||
self.promise.then(&self.closure); | ||
} | ||
|
||
fn done(&self) { | ||
self.is_spinning.set(false); | ||
} | ||
} | ||
|
||
|
||
struct Executor { | ||
// This is a queue of Tasks which will be polled in order | ||
tasks: RefCell<VecDeque<Arc<Task>>>, | ||
|
||
// This is used to ensure that Tasks are polled on the next microtask event tick | ||
next_tick: NextTick, | ||
} | ||
|
||
// TODO This is only safe because JS is currently single-threaded | ||
unsafe impl Send for Executor {} | ||
unsafe impl Sync for Executor {} | ||
|
||
lazy_static! { | ||
static ref EXECUTOR: Executor = Executor { | ||
tasks: RefCell::new(VecDeque::new()), | ||
|
||
// This closure will only be called on the next microtask event tick | ||
next_tick: NextTick::new(|| { | ||
let tasks = &EXECUTOR.tasks; | ||
|
||
loop { | ||
let mut lock = tasks.borrow_mut(); | ||
|
||
match lock.pop_front() { | ||
Some(task) => { | ||
let mut future = task.future.borrow_mut(); | ||
|
||
let poll = { | ||
// This will only panic if the Future wakes up the Waker after returning Poll::Ready | ||
let mut future = future.as_mut().unwrap_throw(); | ||
|
||
// Clear `is_queued` flag so that it will re-queue if poll calls waker.wake() | ||
task.is_queued.set(false); | ||
|
||
// This is necessary because the polled task might queue more tasks | ||
drop(lock); | ||
|
||
// TODO is there some way of saving these so they don't need to be recreated all the time ? | ||
let waker = ArcWake::into_waker(task.clone()); | ||
let cx = &mut Context::from_waker(&waker); | ||
Pin::new(&mut future).poll(cx) | ||
}; | ||
|
||
if let Poll::Ready(_) = poll { | ||
// Cleanup the Future immediately | ||
*future = None; | ||
} | ||
}, | ||
None => { | ||
// All of the Tasks have been polled, so it's now possible to schedule the NextTick again | ||
EXECUTOR.next_tick.done(); | ||
break; | ||
}, | ||
} | ||
} | ||
}), | ||
}; | ||
} | ||
|
||
|
||
ArcWake::wake_by_ref(&Task::new(future)); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters