Skip to content

Commit

Permalink
improve spawn_blocked_future and block_on
Browse files Browse the repository at this point in the history
Signed-off-by: Vrtgs <[email protected]>
  • Loading branch information
Vrtgs committed Jan 5, 2025
1 parent cc7abd6 commit 6c67941
Showing 1 changed file with 20 additions and 29 deletions.
49 changes: 20 additions & 29 deletions thirtyfour/src/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::LazyLock;
use std::time::Duration;
use std::{io, thread};

// used in drop code so its really bad to have a stack overflow then
// used in drop code so it's terrible to have a stack overflow then
const BOX_FUTURE_THRESHOLD: usize = 512;

// a global runtime that is being driven al the time
Expand Down Expand Up @@ -48,7 +48,7 @@ static GLOBAL_RT: LazyLock<tokio::runtime::Handle> = LazyLock::new(|| {
});

/// Helper to run the specified future and block the current thread waiting for the result.
/// works even while in a tokio runtime
/// works in a multi-threaded runtime, but will panic on a single threaded runtime
pub fn block_on<F>(future: F) -> F::Output
where
F: Future + Send,
Expand All @@ -63,20 +63,7 @@ where
}
}

fn block_on_inner<F>(future: F) -> F::Output
where
F: Future + Send,
F::Output: Send,
{
macro_rules! block_global {
($future:expr) => {
thread::scope(|scope| match scope.spawn(|| GLOBAL_RT.block_on($future)).join() {
Ok(res) => res,
Err(panic) => std::panic::resume_unwind(panic),
})
};
}

fn block_on_inner<F: Future>(future: F) -> F::Output {
cfg_if::cfg_if! {
if #[cfg(feature = "tokio-multi-threaded")] {
use tokio::runtime::RuntimeFlavor;
Expand All @@ -85,23 +72,23 @@ where
Ok(handle) if handle.runtime_flavor() == RuntimeFlavor::MultiThread => {
tokio::task::block_in_place(|| handle.block_on(future))
}
_ => block_global!(future),
_ => GLOBAL_RT.block_on(future),
}
} else {
block_global!(future)
GLOBAL_RT.block_on(future)
}
}
}

/// Helper to run the specified future and bind it to run before runtime shutdown
/// this is not guaranteed to not block on the future, just that it wont block on a
/// this is not guaranteed to not block on the future, just that it won't block on a
/// current threaded runtime true is passed in if it is placed in a newly created runtime
pub fn spawn_blocked_future<Fn, F>(future: Fn)
where
Fn: FnOnce(bool) -> F,
F: Future + Send + 'static,
{
if cfg!(debug_assertions) && size_of::<F>() > BOX_FUTURE_THRESHOLD {
if size_of::<F>() > BOX_FUTURE_THRESHOLD {
spawn_blocked_future_inner(Box::new(future))
} else {
spawn_blocked_future_inner(future)
Expand All @@ -114,24 +101,28 @@ where
F: Future + Send + 'static,
{
macro_rules! spawn_off {
($future: expr) => {{
($future: expr, $try_handle: expr) => {{
let future = $future;
let func = move || {
GLOBAL_RT.block_on(future);
};
match tokio::runtime::Handle::try_current() {
match $try_handle {
Ok(handle) => {
let (tx, rx) = std::sync::mpsc::sync_channel(0);
let handle_clone = handle.clone();
handle.spawn_blocking(move || {
if tx.send(()).is_ok() {
func();
handle_clone.block_on(future);
}
});
rx.recv().expect("could not spawn task");

rx.recv().expect("spawned task should be able to be scheduled properly")
}
Err(_) => {
GLOBAL_RT.block_on(future);
}
Err(_) => func(),
}
}};
($future: expr) => {{
spawn_off!($future, tokio::runtime::Handle::try_current())
}};
}

cfg_if::cfg_if! {
Expand All @@ -144,7 +135,7 @@ where
handle.block_on(future(false))
});
}
_ => spawn_off!(future(true)),
maybe_handle => spawn_off!(future(true), maybe_handle),
}
} else {
spawn_off!(future(true))
Expand Down

0 comments on commit 6c67941

Please sign in to comment.