Skip to content

Commit

Permalink
RetryableFileReader: respect user wishes
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Oct 17, 2024
1 parent 330d43e commit 873e01b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5040,6 +5040,7 @@ version = "0.19.0-alpha.14+dev"
dependencies = [
"ahash",
"anyhow",
"crossbeam",
"image",
"notify",
"once_cell",
Expand All @@ -5049,6 +5050,7 @@ dependencies = [
"re_build_info",
"re_build_tools",
"re_chunk",
"re_crash_handler",
"re_log",
"re_log_encoding",
"re_log_types",
Expand Down
2 changes: 2 additions & 0 deletions crates/store/re_data_loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ default = []
[dependencies]
re_build_info.workspace = true
re_chunk.workspace = true
re_crash_handler.workspace = true # for signal handling
re_log_encoding = { workspace = true, features = ["decoder"] }
re_log_types.workspace = true
re_log.workspace = true
Expand All @@ -36,6 +37,7 @@ re_types = { workspace = true, features = ["image", "video"] }
ahash.workspace = true
anyhow.workspace = true
arrow2.workspace = true
crossbeam.workspace = true
image.workspace = true
notify.workspace = true
once_cell.workspace = true
Expand Down
51 changes: 37 additions & 14 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crossbeam::channel::Receiver;
use re_log_encoding::decoder::Decoder;

// ---
Expand Down Expand Up @@ -145,7 +146,9 @@ fn decode_and_stream<R: std::io::Read>(
#[cfg(not(target_arch = "wasm32"))]
struct RetryableFileReader {
reader: std::io::BufReader<std::fs::File>,
rx: std::sync::mpsc::Receiver<notify::Result<notify::Event>>,
rx_file_notifs: Receiver<notify::Result<notify::Event>>,
rx_ticker: Receiver<std::time::Instant>,

#[allow(dead_code)]
watcher: notify::RecommendedWatcher,
}
Expand All @@ -160,8 +163,13 @@ impl RetryableFileReader {
.with_context(|| format!("Failed to open file {filepath:?}"))?;
let reader = std::io::BufReader::new(file);

let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = notify::recommended_watcher(tx)
re_crash_handler::sigint::track_sigint();
// 50ms is just a nice tradeoff: we just need the delay to not be perceptible by a human
// while not needlessly hammering the CPU.
let rx_ticker = crossbeam::channel::tick(std::time::Duration::from_millis(50));

let (tx_file_notifs, rx_file_notifs) = crossbeam::channel::unbounded();
let mut watcher = notify::recommended_watcher(tx_file_notifs)
.with_context(|| format!("failed to create file watcher for {filepath:?}"))?;

watcher
Expand All @@ -170,7 +178,8 @@ impl RetryableFileReader {

Ok(Self {
reader,
rx,
rx_file_notifs,
rx_ticker,
watcher,
})
}
Expand Down Expand Up @@ -198,16 +207,30 @@ impl std::io::Read for RetryableFileReader {
impl RetryableFileReader {
fn block_until_file_changes(&self) -> std::io::Result<usize> {
#[allow(clippy::disallowed_methods)]
match self.rx.recv() {
Ok(Ok(event)) => match event.kind {
notify::EventKind::Remove(_) => Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"file removed",
)),
_ => Ok(0),
},
Ok(Err(err)) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
loop {
crossbeam::select! {
// Periodically check for SIGINT.
recv(self.rx_ticker) -> _ => {
if re_crash_handler::sigint::was_sigint_ever_caught() {
return Err(std::io::Error::new(std::io::ErrorKind::Interrupted, "SIGINT"));
}
}

// Otherwise check for file notifications.
recv(self.rx_file_notifs) -> res => {
return match res {
Ok(Ok(event)) => match event.kind {
notify::EventKind::Remove(_) => Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"file removed",
)),
_ => Ok(0),
},
Ok(Err(err)) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
}
}
}
}
}
}
Expand Down

0 comments on commit 873e01b

Please sign in to comment.