Skip to content

Commit

Permalink
runtime: callback when a worker parks and unparks (#4070)
Browse files Browse the repository at this point in the history
  • Loading branch information
farnz authored Sep 16, 2021
1 parent ab34805 commit 957ed3e
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 22 deletions.
34 changes: 28 additions & 6 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::Callback;
use crate::sync::notify::Notify;
use crate::util::{waker_ref, Wake, WakerRef};

Expand Down Expand Up @@ -49,6 +50,11 @@ struct Inner<P: Park> {
/// Thread park handle
park: P,

/// Callback for a worker parking itself
before_park: Option<Callback>,
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,

/// Stats batcher
stats: WorkerStatsBatcher,
}
Expand Down Expand Up @@ -121,7 +127,11 @@ const REMOTE_FIRST_INTERVAL: u8 = 31;
scoped_thread_local!(static CURRENT: Context);

impl<P: Park> BasicScheduler<P> {
pub(crate) fn new(park: P) -> BasicScheduler<P> {
pub(crate) fn new(
park: P,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
) -> BasicScheduler<P> {
let unpark = Box::new(park.unpark());

let spawner = Spawner {
Expand All @@ -141,6 +151,8 @@ impl<P: Park> BasicScheduler<P> {
spawner: spawner.clone(),
tick: 0,
park,
before_park,
after_unpark,
stats: WorkerStatsBatcher::new(0),
}));

Expand Down Expand Up @@ -247,11 +259,21 @@ impl<P: Park> Inner<P> {
let entry = match entry {
Some(entry) => entry,
None => {
// Park until the thread is signaled
scheduler.stats.about_to_park();
scheduler.stats.submit(&scheduler.spawner.shared.stats);
scheduler.park.park().expect("failed to park");
scheduler.stats.returned_from_park();
if let Some(f) = &scheduler.before_park {
f();
}
// This check will fail if `before_park` spawns a task for us to run
// instead of parking the thread
if context.tasks.borrow_mut().queue.is_empty() {
// Park until the thread is signaled
scheduler.stats.about_to_park();
scheduler.stats.submit(&scheduler.spawner.shared.stats);
scheduler.park.park().expect("failed to park");
scheduler.stats.returned_from_park();
}
if let Some(f) = &scheduler.after_unpark {
f();
}

// Try polling the `block_on` future next
continue 'outer;
Expand Down
131 changes: 128 additions & 3 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub struct Builder {
/// To run before each worker thread stops
pub(super) before_stop: Option<Callback>,

/// To run before each worker thread is parked.
pub(super) before_park: Option<Callback>,

/// To run after each thread is unparked.
pub(super) after_unpark: Option<Callback>,

/// Customizable keep alive timeout for BlockingPool
pub(super) keep_alive: Option<Duration>,
}
Expand Down Expand Up @@ -135,6 +141,8 @@ impl Builder {
// No worker thread callbacks
after_start: None,
before_stop: None,
before_park: None,
after_unpark: None,

keep_alive: None,
}
Expand Down Expand Up @@ -374,6 +382,120 @@ impl Builder {
self
}

/// Executes function `f` just before a thread is parked (goes idle).
/// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
/// can be called, and may result in this thread being unparked immediately.
///
/// This can be used to start work only when the executor is idle, or for bookkeeping
/// and monitoring purposes.
///
/// Note: There can only be one park callback for a runtime; calling this function
/// more than once replaces the last callback defined, rather than adding to it.
///
/// # Examples
///
/// ## Multithreaded executor
/// ```
/// # use std::sync::Arc;
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use tokio::runtime;
/// # use tokio::sync::Barrier;
/// # pub fn main() {
/// let once = AtomicBool::new(true);
/// let barrier = Arc::new(Barrier::new(2));
///
/// let runtime = runtime::Builder::new_multi_thread()
/// .worker_threads(1)
/// .on_thread_park({
/// let barrier = barrier.clone();
/// move || {
/// let barrier = barrier.clone();
/// if once.swap(false, Ordering::Relaxed) {
/// tokio::spawn(async move { barrier.wait().await; });
/// }
/// }
/// })
/// .build()
/// .unwrap();
///
/// runtime.block_on(async {
/// barrier.wait().await;
/// })
/// # }
/// ```
/// ## Current thread executor
/// ```
/// # use std::sync::Arc;
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use tokio::runtime;
/// # use tokio::sync::Barrier;
/// # pub fn main() {
/// let once = AtomicBool::new(true);
/// let barrier = Arc::new(Barrier::new(2));
///
/// let runtime = runtime::Builder::new_current_thread()
/// .on_thread_park({
/// let barrier = barrier.clone();
/// move || {
/// let barrier = barrier.clone();
/// if once.swap(false, Ordering::Relaxed) {
/// tokio::spawn(async move { barrier.wait().await; });
/// }
/// }
/// })
/// .build()
/// .unwrap();
///
/// runtime.block_on(async {
/// barrier.wait().await;
/// })
/// # }
/// ```
#[cfg(not(loom))]
pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.before_park = Some(std::sync::Arc::new(f));
self
}

/// Executes function `f` just after a thread unparks (starts executing tasks).
///
/// This is intended for bookkeeping and monitoring use cases; note that work
/// in this callback will increase latencies when the application has allowed one or
/// more runtime threads to go idle.
///
/// Note: There can only be one unpark callback for a runtime; calling this function
/// more than once replaces the last callback defined, rather than adding to it.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_unpark(|| {
/// println!("thread unparking");
/// })
/// .build();
///
/// runtime.unwrap().block_on(async {
/// tokio::task::yield_now().await;
/// println!("Hello from Tokio!");
/// })
/// # }
/// ```
#[cfg(not(loom))]
pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.after_unpark = Some(std::sync::Arc::new(f));
self
}

/// Creates the configured `Runtime`.
///
/// The returned `Runtime` instance is ready to spawn tasks.
Expand Down Expand Up @@ -441,7 +563,8 @@ impl Builder {
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = BasicScheduler::new(driver);
let scheduler =
BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::Basic(scheduler.spawner().clone());

// Blocking pool
Expand Down Expand Up @@ -546,7 +669,7 @@ cfg_rt_multi_thread! {

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

// Create the blocking pool
Expand Down Expand Up @@ -587,7 +710,9 @@ impl fmt::Debug for Builder {
)
.field("thread_stack_size", &self.thread_stack_size)
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
.field("before_stop", &self.after_start.as_ref().map(|_| "..."))
.field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
.field("before_park", &self.before_park.as_ref().map(|_| "..."))
.field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
.finish()
}
}
8 changes: 8 additions & 0 deletions tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ impl<T> Local<T> {
!self.inner.is_empty()
}

/// Returns false if there are any entries in the queue
///
/// Separate to is_stealable so that refactors of is_stealable to "protect"
/// some tasks from stealing won't affect this
pub(super) fn has_tasks(&self) -> bool {
!self.inner.is_empty()
}

/// Pushes a task to the back of the local queue, skipping the LIFO slot.
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
let tail = loop {
Expand Down
11 changes: 8 additions & 3 deletions tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub(crate) use worker::block_in_place;
use crate::loom::sync::Arc;
use crate::runtime::stats::RuntimeStats;
use crate::runtime::task::JoinHandle;
use crate::runtime::Parker;
use crate::runtime::{Callback, Parker};

use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -44,8 +44,13 @@ pub(crate) struct Spawner {
// ===== impl ThreadPool =====

impl ThreadPool {
pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) {
let (shared, launch) = worker::create(size, parker);
pub(crate) fn new(
size: usize,
parker: Parker,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
) -> (ThreadPool, Launch) {
let (shared, launch) = worker::create(size, parker, before_park, after_unpark);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };

Expand Down
48 changes: 38 additions & 10 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use crate::runtime::park::{Parker, Unparker};
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::thread_pool::{AtomicCell, Idle};
use crate::runtime::{queue, task};
use crate::runtime::{queue, task, Callback};
use crate::util::FastRand;

use std::cell::RefCell;
Expand Down Expand Up @@ -142,6 +142,11 @@ pub(super) struct Shared {
#[allow(clippy::vec_box)] // we're moving an already-boxed value
shutdown_cores: Mutex<Vec<Box<Core>>>,

/// Callback for a worker parking itself
before_park: Option<Callback>,
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,

/// Collect stats from the runtime.
stats: RuntimeStats,
}
Expand Down Expand Up @@ -181,7 +186,12 @@ type Notified = task::Notified<Arc<Shared>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);

pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
pub(super) fn create(
size: usize,
park: Parker,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
) -> (Arc<Shared>, Launch) {
let mut cores = vec![];
let mut remotes = vec![];

Expand Down Expand Up @@ -212,6 +222,8 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
before_park,
after_unpark,
stats: RuntimeStats::new(size),
});

Expand Down Expand Up @@ -453,19 +465,26 @@ impl Context {
}

fn park(&self, mut core: Box<Core>) -> Box<Core> {
core.transition_to_parked(&self.worker);
if let Some(f) = &self.worker.shared.before_park {
f();
}

while !core.is_shutdown {
core = self.park_timeout(core, None);
if core.transition_to_parked(&self.worker) {
while !core.is_shutdown {
core = self.park_timeout(core, None);

// Run regularly scheduled maintenance
core.maintenance(&self.worker);
// Run regularly scheduled maintenance
core.maintenance(&self.worker);

if core.transition_from_parked(&self.worker) {
return core;
if core.transition_from_parked(&self.worker) {
break;
}
}
}

if let Some(f) = &self.worker.shared.after_unpark {
f();
}
core
}

Expand Down Expand Up @@ -569,7 +588,14 @@ impl Core {
}

/// Prepare the worker state for parking
fn transition_to_parked(&mut self, worker: &Worker) {
///
/// Returns true if the transition happend, false if there is work to do first
fn transition_to_parked(&mut self, worker: &Worker) -> bool {
// Workers should not park if they have work to do
if self.lifo_slot.is_some() || self.run_queue.has_tasks() {
return false;
}

// When the final worker transitions **out** of searching to parked, it
// must check all the queues one last time in case work materialized
// between the last work scan and transitioning out of searching.
Expand All @@ -585,6 +611,8 @@ impl Core {
if is_last_searcher {
worker.shared.notify_if_work_pending();
}

true
}

/// Returns `true` if the transition happened.
Expand Down

0 comments on commit 957ed3e

Please sign in to comment.