Skip to content

Commit

Permalink
RetryableFileReader: account for user interrupts (#7801)
Browse files Browse the repository at this point in the history
This makes sure `RetryableFileReader` polls for user interrupts where
necessary, so a non-terminated RRD file never hangs the user's process
for the rest of times.

I tried various simpler approaches, but they all failed because... well,
you know, UNIX signals.
In the end I'm actually not too unhappy with this, really.

* Fixes #7791
  • Loading branch information
teh-cmc authored Oct 17, 2024
1 parent 1c22e80 commit 8721eb8
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5040,6 +5040,7 @@ version = "0.19.0-alpha.14+dev"
dependencies = [
"ahash",
"anyhow",
"crossbeam",
"image",
"notify",
"once_cell",
Expand All @@ -5049,6 +5050,7 @@ dependencies = [
"re_build_info",
"re_build_tools",
"re_chunk",
"re_crash_handler",
"re_log",
"re_log_encoding",
"re_log_types",
Expand Down
4 changes: 4 additions & 0 deletions crates/store/re_data_loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ re_types = { workspace = true, features = ["image", "video"] }
ahash.workspace = true
anyhow.workspace = true
arrow2.workspace = true
crossbeam.workspace = true
image.workspace = true
notify.workspace = true
once_cell.workspace = true
Expand All @@ -44,6 +45,9 @@ rayon.workspace = true
thiserror.workspace = true
walkdir.workspace = true

[target.'cfg(not(any(target_arch = "wasm32")))'.dependencies]
re_crash_handler.workspace = true

[dev-dependencies]
re_log_encoding = { workspace = true, features = ["decoder", "encoder"] }
tempfile.workspace = true
Expand Down
55 changes: 41 additions & 14 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use re_log_encoding::decoder::Decoder;

#[cfg(not(target_arch = "wasm32"))]
use crossbeam::channel::Receiver;

// ---

/// Loads data from any `rrd` file or in-memory contents.
Expand Down Expand Up @@ -145,7 +148,9 @@ fn decode_and_stream<R: std::io::Read>(
#[cfg(not(target_arch = "wasm32"))]
struct RetryableFileReader {
reader: std::io::BufReader<std::fs::File>,
rx: std::sync::mpsc::Receiver<notify::Result<notify::Event>>,
rx_file_notifs: Receiver<notify::Result<notify::Event>>,
rx_ticker: Receiver<std::time::Instant>,

#[allow(dead_code)]
watcher: notify::RecommendedWatcher,
}
Expand All @@ -160,8 +165,15 @@ impl RetryableFileReader {
.with_context(|| format!("Failed to open file {filepath:?}"))?;
let reader = std::io::BufReader::new(file);

let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = notify::recommended_watcher(tx)
#[cfg(not(any(target_os = "windows", target_arch = "wasm32")))]
re_crash_handler::sigint::track_sigint();

// 50ms is just a nice tradeoff: we just need the delay to not be perceptible by a human
// while not needlessly hammering the CPU.
let rx_ticker = crossbeam::channel::tick(std::time::Duration::from_millis(50));

let (tx_file_notifs, rx_file_notifs) = crossbeam::channel::unbounded();
let mut watcher = notify::recommended_watcher(tx_file_notifs)
.with_context(|| format!("failed to create file watcher for {filepath:?}"))?;

watcher
Expand All @@ -170,7 +182,8 @@ impl RetryableFileReader {

Ok(Self {
reader,
rx,
rx_file_notifs,
rx_ticker,
watcher,
})
}
Expand Down Expand Up @@ -198,16 +211,30 @@ impl std::io::Read for RetryableFileReader {
impl RetryableFileReader {
fn block_until_file_changes(&self) -> std::io::Result<usize> {
#[allow(clippy::disallowed_methods)]
match self.rx.recv() {
Ok(Ok(event)) => match event.kind {
notify::EventKind::Remove(_) => Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"file removed",
)),
_ => Ok(0),
},
Ok(Err(err)) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
loop {
crossbeam::select! {
// Periodically check for SIGINT.
recv(self.rx_ticker) -> _ => {
if re_crash_handler::sigint::was_sigint_ever_caught() {
return Err(std::io::Error::new(std::io::ErrorKind::Interrupted, "SIGINT"));
}
}

// Otherwise check for file notifications.
recv(self.rx_file_notifs) -> res => {
return match res {
Ok(Ok(event)) => match event.kind {
notify::EventKind::Remove(_) => Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"file removed",
)),
_ => Ok(0),
},
Ok(Err(err)) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
}
}
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/utils/re_crash_handler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Detect and handle signals, panics, and other crashes, making sure to log them and optionally send them off to analytics.
pub mod sigint;

use re_build_info::BuildInfo;

#[cfg(not(target_os = "windows"))]
Expand Down
51 changes: 51 additions & 0 deletions crates/utils/re_crash_handler/src/sigint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::sync::atomic::AtomicBool;

// ---

static SIGINT_RECEIVED: AtomicBool = AtomicBool::new(false);

/// Call this to start tracking `SIGINT`s.
///
/// You can then call [`was_sigint_ever_caught`] at any point in time.
#[cfg(not(any(target_os = "windows", target_arch = "wasm32")))]
#[allow(unsafe_code)]
#[allow(clippy::fn_to_numeric_cast_any)]
pub fn track_sigint() {
static ONCE: std::sync::Once = std::sync::Once::new();

ONCE.call_once(|| {
// SAFETY: we're installing a signal handler.
unsafe {
libc::signal(
libc::SIGINT,
signal_handler as *const fn(libc::c_int) as libc::size_t,
);
}

unsafe extern "C" fn signal_handler(signum: libc::c_int) {
SIGINT_RECEIVED.store(true, std::sync::atomic::Ordering::Relaxed);

// SAFETY: we're calling a signal handler.
unsafe {
libc::signal(signum, libc::SIG_DFL);
libc::raise(signum);
}
}
});
}

#[cfg(any(target_os = "windows", target_arch = "wasm32"))]
#[allow(unsafe_code)]
#[allow(clippy::fn_to_numeric_cast_any)]
pub fn track_sigint() {}

/// Returns whether a `SIGINT` was ever caught.
///
/// Need to call [`track_sigint`] at least once first.
pub fn was_sigint_ever_caught() -> bool {
// If somebody forgot to call this, at least we will only miss the first SIGINT, but
// SIGINT-spamming will still work.
track_sigint();

SIGINT_RECEIVED.load(std::sync::atomic::Ordering::Relaxed)
}

0 comments on commit 8721eb8

Please sign in to comment.