Skip to content

Commit

Permalink
Add CpuFuture::forget() function.
Browse files Browse the repository at this point in the history
CpuFuture::forget() consumes the future but does not cancel
the computation.

Closes rust-lang#293
  • Loading branch information
kkimdev committed Dec 16, 2016
1 parent 6f52a5d commit 035a9eb
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions futures-cpupool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ extern crate num_cpus;

use std::panic::{self, AssertUnwindSafe};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;

use crossbeam::sync::MsQueue;
Expand Down Expand Up @@ -89,6 +89,7 @@ pub struct Builder {
struct MySender<F, T> {
fut: F,
tx: Option<Sender<T>>,
keep_running_flag: Arc<AtomicBool>,
}

fn _assert() {
Expand All @@ -114,6 +115,7 @@ struct Inner {
#[must_use]
pub struct CpuFuture<T, E> {
inner: Receiver<thread::Result<Result<T, E>>>,
keep_running_flag: Arc<AtomicBool>,
}

enum Message {
Expand Down Expand Up @@ -173,15 +175,17 @@ impl CpuPool {
F::Error: Send + 'static,
{
let (tx, rx) = channel();
let keep_running_flag = Arc::new(AtomicBool::new(false));
// AssertUnwindSafe is used here becuase `Send + 'static` is basically
// an alias for an implementation of the `UnwindSafe` trait but we can't
// express that in the standard library right now.
let sender = MySender {
fut: AssertUnwindSafe(f).catch_unwind(),
tx: Some(tx),
keep_running_flag: keep_running_flag.clone(),
};
executor::spawn(sender).execute(self.inner.clone());
CpuFuture { inner: rx }
CpuFuture { inner: rx , keep_running_flag: keep_running_flag.clone() }
}

/// Spawns a closure on this thread pool.
Expand Down Expand Up @@ -238,6 +242,13 @@ impl Executor for Inner {
}
}

impl<T, E> CpuFuture<T, E> {
/// TODO
pub fn forget(self) {
self.keep_running_flag.store(true, Ordering::SeqCst);
}
}

impl<T: Send + 'static, E: Send + 'static> Future for CpuFuture<T, E> {
type Item = T;
type Error = E;
Expand All @@ -258,8 +269,10 @@ impl<F: Future> Future for MySender<F, Result<F::Item, F::Error>> {

fn poll(&mut self) -> Poll<(), ()> {
if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() {
// Cancelled, bail out
return Ok(().into())
if !self.keep_running_flag.load(Ordering::SeqCst) {
// Cancelled, bail out
return Ok(().into())
}
}

let res = match self.fut.poll() {
Expand Down

0 comments on commit 035a9eb

Please sign in to comment.