Skip to content

Commit

Permalink
Fix buffer overflow on tests, and update spawn_blocked_future's imple…
Browse files Browse the repository at this point in the history
…mentation

Signed-off-by: Vrtgs <[email protected]>
  • Loading branch information
Vrtgs committed Oct 22, 2024
1 parent 7962b59 commit b0351a3
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
2 changes: 1 addition & 1 deletion thirtyfour/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ reqwest = { version = "0.12.8", default-features = false, features = [
assert_matches = "1.5"
axum = "0.7"
color-eyre = "0.6"
rstest = { version = "0.22.0", default-features = false }
rstest = { version = "0.23.0", default-features = false }
tower-http = { version = "0.6", features = ["fs"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1", features = ["rt-multi-thread"] }
Expand Down
50 changes: 43 additions & 7 deletions thirtyfour/src/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,24 @@ use std::sync::LazyLock;
use std::time::Duration;
use std::{io, thread};

// used in drop code so its really bad to have a stack overflow then
const BOX_FUTURE_THRESHOLD: usize = 512;

/// Helper to run the specified future and block the current thread waiting for the result.
/// works even while in a tokio runtime
pub fn block_on<F>(future: F) -> F::Output
where
F: Future + Send,
F::Output: Send,
{
if cfg!(debug_assertions) && size_of::<F>() > BOX_FUTURE_THRESHOLD {
block_on_inner(Box::pin(future))
} else {
block_on_inner(future)
}
}

fn block_on_inner<F>(future: F) -> F::Output
where
F: Future + Send,
F::Output: Send,
Expand Down Expand Up @@ -76,26 +91,47 @@ where
}

/// Helper to run the specified future and bind it to run before runtime shutdown
/// the bool is true if it is spawned
/// else it has been spawned in place
/// this is not guaranteed to not block on the future, just that it wont block on a
/// current threaded runtime true is passed in if it is placed in a newly created runtime
pub fn spawn_blocked_future<Fn, F>(future: Fn)
where
Fn: FnOnce(bool) -> F,
F: Future + Send + 'static,
{
if cfg!(debug_assertions) && size_of::<F>() > BOX_FUTURE_THRESHOLD {
spawn_blocked_future_inner(Box::new(future))
} else {
spawn_blocked_future_inner(future)
}
}

fn spawn_blocked_future_inner<Fn, F>(future: Fn)
where
Fn: FnOnce(bool) -> F,
F: Future + Send + 'static,
{
macro_rules! spawn_off {
($future: expr) => {{
let future = $future;
let (tx, rx) = std::sync::mpsc::sync_channel(0);
tokio::task::spawn_blocking(move || {
let _ = tx.send(());
let func = move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create tokio runtime")
.block_on(future);
});
rx.recv().expect("could not spawn task");
};
match tokio::runtime::Handle::try_current() {
Ok(handle) => {
let (tx, rx) = std::sync::mpsc::sync_channel(0);
handle.spawn_blocking(move || {
if tx.send(()).is_ok() {
func();
}
});
rx.recv().expect("could not spawn task");
}
Err(_) => func(),
}
}};
}

Expand Down

0 comments on commit b0351a3

Please sign in to comment.