From 035a9ebd6b40107e8dc7d163ea00f5a55fda20a2 Mon Sep 17 00:00:00 2001 From: Kibeom Kim Date: Fri, 16 Dec 2016 12:48:24 -0800 Subject: [PATCH] Add CpuFuture::forget() function. CpuFuture::forget() consumes the future but does not cancel the computation. Closes #293 --- futures-cpupool/src/lib.rs | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/futures-cpupool/src/lib.rs b/futures-cpupool/src/lib.rs index f39e9ff010d..0c4213ca4fd 100644 --- a/futures-cpupool/src/lib.rs +++ b/futures-cpupool/src/lib.rs @@ -44,7 +44,7 @@ extern crate num_cpus; use std::panic::{self, AssertUnwindSafe}; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::thread; use crossbeam::sync::MsQueue; @@ -89,6 +89,7 @@ pub struct Builder { struct MySender { fut: F, tx: Option>, + keep_running_flag: Arc, } fn _assert() { @@ -114,6 +115,7 @@ struct Inner { #[must_use] pub struct CpuFuture { inner: Receiver>>, + keep_running_flag: Arc, } enum Message { @@ -173,15 +175,17 @@ impl CpuPool { F::Error: Send + 'static, { let (tx, rx) = channel(); + let keep_running_flag = Arc::new(AtomicBool::new(false)); // AssertUnwindSafe is used here becuase `Send + 'static` is basically // an alias for an implementation of the `UnwindSafe` trait but we can't // express that in the standard library right now. let sender = MySender { fut: AssertUnwindSafe(f).catch_unwind(), tx: Some(tx), + keep_running_flag: keep_running_flag.clone(), }; executor::spawn(sender).execute(self.inner.clone()); - CpuFuture { inner: rx } + CpuFuture { inner: rx , keep_running_flag: keep_running_flag.clone() } } /// Spawns a closure on this thread pool. @@ -238,6 +242,13 @@ impl Executor for Inner { } } +impl CpuFuture { + /// TODO + pub fn forget(self) { + self.keep_running_flag.store(true, Ordering::SeqCst); + } +} + impl Future for CpuFuture { type Item = T; type Error = E; @@ -258,8 +269,10 @@ impl Future for MySender> { fn poll(&mut self) -> Poll<(), ()> { if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() { - // Cancelled, bail out - return Ok(().into()) + if !self.keep_running_flag.load(Ordering::SeqCst) { + // Cancelled, bail out + return Ok(().into()) + } } let res = match self.fut.poll() {