Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve watcher #125

Merged
merged 9 commits into from
Apr 30, 2024
397 changes: 181 additions & 216 deletions Cargo.lock

Large diffs are not rendered by default.

1,280 changes: 756 additions & 524 deletions Cargo.nix

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ edition = "2021"
# before updating dependencies: we want to keep the rustc compatible with 1.41.0 for now (start of 2020).
[dependencies]
# central features
notify = "= 5.0.0-pre.1"
notify = "6.1.1"
atomicwrites = "0.2.5"
crossbeam-channel = "0.3.8"
crossbeam-channel = "0.5.12"
nix = "0.20.0"
regex = "1.4.3"
tempfile = "3.1.0"
Expand All @@ -26,7 +26,11 @@ thiserror = "1.0"

# TODO: update to 0.3
structopt.version = "0.2"

# TODO: update to 0.3
structopt.default-features = false

# TODO: update to 0.3
structopt.features = [
# "default",
"suggestions",
Expand Down Expand Up @@ -55,11 +59,14 @@ lazy_static = "1.4.0"
md5 = "0.7.0"
vec1 = ">= 1.1.0, <1.7.0"
human-panic = { path = "vendor/human-panic" }
notify-debouncer-full = "0.3.1"

[dev-dependencies]
# 1.0.0 requires at least rust 1.50
proptest.version = "0.10.1"
# 1.0.0 requires at least rust 1.50
proptest.default-features = false
# 1.0.0 requires at least rust 1.50
proptest.features = [
"std",
# reenable if proptest kills the test runner
Expand Down
2 changes: 1 addition & 1 deletion build.ninja
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ build run-ci-checks : run-ci-checks ./shell.nix
rule build-ci-files
command = eval $$(nix-build $in -A writeConfig)

build .github/workflows/ci.yml : build-ci-files ./.github/workflows/ci.nix ./Cargo.nix
build .github/workflows/ci.yml : build-ci-files ./.github/workflows/ci.nix | ./Cargo.nix

# update Cargo.nix from Cargo.toml
rule update-cargo-nix
Expand Down
32 changes: 15 additions & 17 deletions src/build_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ impl<'a> BuildLoop<'a> {
nix_gc_root_user_dir: project::NixGcRootUserDir,
logger: slog::Logger,
) -> anyhow::Result<BuildLoop<'a>> {
let mut watch = Watch::new(logger.clone()).map_err(|err| anyhow!(err))?;
let watch = Watch::new(&logger).map_err(|err| anyhow!(err))?;
watch
.extend(vec![WatchPathBuf::Normal(
.add_to_watch_tx
.send(vec![WatchPathBuf::Normal(
project.nix_file.as_absolute_path().to_owned(),
)])
.with_context(|| {
Expand Down Expand Up @@ -201,7 +202,7 @@ impl<'a> BuildLoop<'a> {
rx_ping: chan::Receiver<()>,
) -> crate::Never {
let mut current_build = BuildState::NotRunning;
let rx_watcher = self.watch.rx.clone();
let rx_watcher = self.watch.watch_events_rx.clone();

loop {
debug!(self.logger, "looping build_loop";
Expand Down Expand Up @@ -247,19 +248,13 @@ impl<'a> BuildLoop<'a> {

// watcher found file change
recv(rx_watcher) -> msg => match msg {
Ok(msg) => {
match self.watch.process_watch_events(msg) {
Some(changed) => {
// TODO: this is not a started, this is just a scheduled!
send_event(Event::Started {
nix_file: self.project.nix_file.clone(),
reason: Reason::FilesChanged(changed)
});
self.start_or_schedule_build(&mut current_build)
},
// No relevant file events
None => {}
}
Ok(changed) => {
// TODO: this is not a started, this is just a scheduled!
send_event(Event::Started {
nix_file: self.project.nix_file.clone(),
reason: Reason::FilesChanged(changed)
});
self.start_or_schedule_build(&mut current_build)
},
Err(chan::RecvError) =>
debug!(self.logger, "notify chan was disconnected"; "project" => &self.project.nix_file)
Expand Down Expand Up @@ -340,7 +335,10 @@ impl<'a> BuildLoop<'a> {
debug!(self.logger, "paths reduced"; "from" => original_paths_len, "to" => paths.len());

// add all new (reduced) nix sources to the input source watchlist
self.watch.extend(paths.into_iter().collect::<Vec<_>>())?;
self.watch
.add_to_watch_tx
.send(paths.into_iter().collect::<Vec<_>>())
.map_err(BuildError::io)?;

// root the result
self.project
Expand Down
6 changes: 3 additions & 3 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ in {}
&crate::NixFile::from(cas.file_from_string(&nix_drv)?),
&cas,
&NixOptions::empty(),
&crate::logging::test_logger(),
&crate::logging::test_logger("non_utf8_nix_output"),
)
.expect("should not crash!");
Ok(())
Expand All @@ -610,7 +610,7 @@ in {}
&d,
&cas,
&NixOptions::empty(),
&crate::logging::test_logger(),
&crate::logging::test_logger("gracefully_handle_failing_build"),
) {
} else {
assert!(
Expand Down Expand Up @@ -673,7 +673,7 @@ dir-as-source = ./dir;
&NixFile::from(AbsPathBuf::new(shell).unwrap()),
&cas,
&NixOptions::empty(),
&crate::logging::test_logger(),
&crate::logging::test_logger("no_unnecessary_files_or_directories_watched"),
)
.unwrap();
let ends_with = |end| {
Expand Down
4 changes: 2 additions & 2 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub fn root(verbosity: Verbosity) -> slog::Logger {
}

/// Logger that can be used in tests
pub fn test_logger() -> slog::Logger {
lorri_logger(slog::Level::Trace)
pub fn test_logger(name: &str) -> slog::Logger {
lorri_logger(slog::Level::Trace).new(slog::o!("lorri_test" => name.to_string()))
}

fn lorri_logger(level: slog::Level) -> slog::Logger {
Expand Down
6 changes: 3 additions & 3 deletions src/nix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl<'a> CallOpts<'a> {
/// import <nixpkgs> {}
/// "#)
/// .attribute("hello")
/// .path(&lorri::logging::test_logger())
/// .path(&lorri::logging::test_logger("doctest_path_1"))
/// .unwrap()
/// ;
///
Expand Down Expand Up @@ -270,7 +270,7 @@ impl<'a> CallOpts<'a> {
/// let paths = nix::CallOpts::expression(r#"
/// { inherit (import <nixpkgs> {}) hello git; }
/// "#)
/// .path(&lorri::logging::test_logger());
/// .path(&lorri::logging::test_logger("doctest_path_2"));
///
/// match paths {
/// Err(BuildError::Output { .. }) => {},
Expand Down Expand Up @@ -310,7 +310,7 @@ impl<'a> CallOpts<'a> {
/// let (paths, gc_root) = nix::CallOpts::expression(r#"
/// { inherit (import <nixpkgs> {}) hello git; }
/// "#)
/// .paths(&lorri::logging::test_logger())
/// .paths(&lorri::logging::test_logger("doctest_paths"))
/// .unwrap();
/// let mut paths = paths
/// .into_iter()
Expand Down
78 changes: 73 additions & 5 deletions src/run_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,21 @@ pub struct Async<Res> {
result_chan: chan::Receiver<Res>,
/// whether the thread should linger (aka detach) after this Async is dropped
linger: bool,
/// Stop signal that gets sent to the thread that potentially signed up for it
stop_signal_tx: Option<chan::Sender<StopSignal>>,
}

/// A stop signal sent to the async thread when it should stop what it’s doing.
pub struct StopSignal();

impl<Res> Drop for Async<Res> {
fn drop(&mut self) {
// send the thread the stop signal if it requested it
if let Some(stop_signal) = self.stop_signal_tx.as_ref() {
stop_signal
.send(StopSignal())
.expect("The async thread was not ready to receive the stop message it requested")
}
// when the Async should not linger, we have to wait for it to finish here.
if !self.linger {
self.thread
Expand All @@ -26,6 +37,10 @@ impl<Res> Drop for Async<Res> {
}
}

/// This value should be returned from a thread when the stop signal has been received. Do not construct (TODO: how to prevent other modules from constructing?)
#[derive(Debug)]
pub struct StopReceived();

impl<Res: Send + 'static> Async<Res> {
/// Create a new Async that runs a function in a thread.
///
Expand All @@ -38,7 +53,25 @@ impl<Res: Send + 'static> Async<Res> {
F: std::panic::UnwindSafe,
F: Send + 'static,
{
Self::run_inner(logger, f, false)
Self::run_inner(logger, f, false, None)
}

/// Create a new Async that runs a function in a thread.
///
/// You can read the result either by blocking
/// or by using the `chan` method to get a channel that receives exactly
/// one result as soon as the the function is done.
///
/// The function *should* listen on the stop signal channel
/// so that it can abort and be joined when it is requested to.
pub fn run_with_stop_signal<F>(logger: &slog::Logger, f: F) -> Self
where
F: FnOnce(chan::Receiver<StopSignal>) -> Res,
F: std::panic::UnwindSafe,
F: Send + 'static,
{
let (stop_signal_tx, stop_signal_rx) = chan::bounded(1);
Self::run_inner(logger, || f(stop_signal_rx), false, Some(stop_signal_tx))
}

/// Create a new Async that runs a function in a thread.
Expand All @@ -56,10 +89,15 @@ impl<Res: Send + 'static> Async<Res> {
F: std::panic::UnwindSafe,
F: Send + 'static,
{
Self::run_inner(logger, f, true)
Self::run_inner(logger, f, true, None)
}

fn run_inner<F>(logger: &slog::Logger, f: F, linger: bool) -> Self
fn run_inner<F>(
logger: &slog::Logger,
f: F,
linger: bool,
stop_signal_tx: Option<chan::Sender<StopSignal>>,
) -> Self
where
F: FnOnce() -> Res,
F: std::panic::UnwindSafe,
Expand All @@ -80,6 +118,7 @@ impl<Res: Send + 'static> Async<Res> {
thread,
result_chan: rx,
linger,
stop_signal_tx,
}
}

Expand Down Expand Up @@ -112,13 +151,20 @@ impl<Res: Send + 'static> Async<Res> {

#[cfg(test)]
mod tests {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};

use super::*;

#[test]
fn test_chan_drop_order() {
// we make the async just block on a channel which we can control from outside
let (tx, rx) = chan::bounded(1);
let a = Async::run(&crate::logging::test_logger(), move || rx.recv());
let a = Async::run(&crate::logging::test_logger("chan_drop_order"), move || {
rx.recv()
});
let c = a.chan();
// nothing has been sent to the thread yet, so timeout
assert_eq!(
Expand All @@ -138,7 +184,10 @@ mod tests {
#[test]
fn test_chan_block_still_works() {
// check that even after getting a channel the blocking still works
let a = Async::run(&crate::logging::test_logger(), move || 42);
let a = Async::run(
&crate::logging::test_logger("chan_block_still_works"),
move || 42,
);
let c = a.chan();
assert_eq!(a.block(), 42);
// would be disconnected, because the result was already retrieved by the block
Expand All @@ -151,4 +200,23 @@ mod tests {
);
// At least you can’t block twice, because the .block() call consumes the Async
}

#[test]
/// Checks that the stop signal is sent as expected, and allows the thread to be joined.
fn test_stop_signal() {
let was_stopped = Arc::new(AtomicBool::new(false));
let was_stopped2 = was_stopped.clone();

let a = Async::run_with_stop_signal(
&crate::logging::test_logger("stop_signal"),
move |stop_signal_rx| {
chan::select! {
recv(stop_signal_rx) -> _ => { was_stopped2.store(true, Ordering::Relaxed) }
}
},
);

drop(a);
assert_eq!(was_stopped.load(Ordering::Relaxed), true)
}
}
Loading
Loading