Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run basic scheduler destructor inside executor context #3966

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl Handle {
.expect("failed to park thread")
}

pub(crate) fn shutdown(mut self) {
pub(crate) fn shutdown(&mut self) {
self.spawner.shutdown();
}
Comment on lines -304 to 306
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a destructor to Runtime means we can no longer move out of the handle field, which would have made it impossible to call shutdown() on it. This was the simplest fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(See the implementation of shutdown_timeout)

}
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ cfg_rt! {
/// The runtime executor is either a thread-pool or a current-thread executor.
#[derive(Debug)]
enum Kind {
/// Runtime is shutting down
Gone,

/// Execute all tasks on the current-thread.
CurrentThread(BasicScheduler<driver::Driver>),

Expand Down Expand Up @@ -447,6 +450,7 @@ cfg_rt! {
let _enter = self.enter();

match &self.kind {
Kind::Gone => panic!("Runtime is shutting down"),
Kind::CurrentThread(exec) => exec.block_on(future),
#[cfg(feature = "rt-multi-thread")]
Kind::ThreadPool(exec) => exec.block_on(future),
Expand Down Expand Up @@ -560,4 +564,12 @@ cfg_rt! {
self.shutdown_timeout(Duration::from_nanos(0))
}
}

impl Drop for Runtime {
fn drop(&mut self) {
let _guard = self.enter();
// Shutdown the runtime whilst it is still active
self.kind = Kind::Gone;
}
}
}
43 changes: 42 additions & 1 deletion tokio/tests/task_abort.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use std::sync::Arc;
use std::sync::{mpsc, Arc};
use std::thread::sleep;
use std::time::Duration;

Expand Down Expand Up @@ -233,3 +233,44 @@ fn test_abort_task_that_panics_on_drop_returned() {
assert!(handle.await.unwrap_err().is_panic());
});
}

struct SpawnOnDrop(mpsc::Sender<bool>);

impl Drop for SpawnOnDrop {
fn drop(&mut self) {
let res = std::panic::catch_unwind(|| {
tokio::spawn(async move {
println!("did something");
});
});
self.0.send(res.is_ok()).unwrap();
}
}

/// Checks that aborting a task whose destructor panics has the expected result.
#[test]
fn test_task_that_spawns_task_on_drop() {
let (tx, rx) = mpsc::channel();

{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();

rt.block_on(async move {
let _handle = tokio::spawn(async move {
// Make sure the Arc is moved into the task
let _spawn_dropped = SpawnOnDrop(tx);
println!("task started");
tokio::time::sleep(std::time::Duration::new(1, 0)).await
});

// wait for task to sleep.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
});
}

// Check that spawning the task did not panic
assert!(rx.recv().unwrap());
}