From 6961855d4e1bf1224cb910761c63af02c0180a37 Mon Sep 17 00:00:00 2001 From: Rebecca Turner Date: Tue, 21 Nov 2023 11:35:08 -0800 Subject: [PATCH] Interrupt reloads when new files change This will give a snappier UX in general. Note that now, many hooks can be canceled or mismatched (e.g. the before-reload hooks can run but the after-reload hooks get canceled). --- src/command_ext.rs | 47 ------------------- src/event_filter.rs | 2 +- src/ghci/manager.rs | 108 ++++++++++++++++++++++++++++++++++++++------ src/ghci/mod.rs | 54 ++++++++++++++++++++-- 4 files changed, 144 insertions(+), 67 deletions(-) diff --git a/src/command_ext.rs b/src/command_ext.rs index 03085620..1935cd73 100644 --- a/src/command_ext.rs +++ b/src/command_ext.rs @@ -1,13 +1,5 @@ use std::process::Command as StdCommand; -use command_group::AsyncCommandGroup; -use command_group::AsyncGroupChild; -use miette::Context; -use miette::IntoDiagnostic; -use nix::sys::signal::pthread_sigmask; -use nix::sys::signal::SigSet; -use nix::sys::signal::SigmaskHow; -use nix::sys::signal::Signal; use tokio::process::Command; /// Extension trait for commands. @@ -35,42 +27,3 @@ impl CommandExt for StdCommand { shell_words::join(tokens) } } - -pub trait SpawnExt { - /// The type of spawned processes. - type Child; - - /// Spawn the command, but do not inherit `SIGINT` signals from the calling process. - fn spawn_group_without_inheriting_sigint(&mut self) -> miette::Result; -} - -impl SpawnExt for Command { - type Child = AsyncGroupChild; - - fn spawn_group_without_inheriting_sigint(&mut self) -> miette::Result { - spawn_without_inheriting_sigint(|| { - self.group_spawn() - .into_diagnostic() - .wrap_err_with(|| format!("Failed to start `{}`", self.display())) - }) - } -} - -fn spawn_without_inheriting_sigint( - spawn: impl FnOnce() -> miette::Result, -) -> miette::Result { - // See: https://github.com/rust-lang/rust/pull/100737#issuecomment-1445257548 - let mut old_signal_mask = SigSet::empty(); - pthread_sigmask( - SigmaskHow::SIG_SETMASK, - Some(&SigSet::from_iter(std::iter::once(Signal::SIGINT))), - Some(&mut old_signal_mask), - ) - .into_diagnostic()?; - - let result = spawn(); - - pthread_sigmask(SigmaskHow::SIG_SETMASK, Some(&old_signal_mask), None).into_diagnostic()?; - - result -} diff --git a/src/event_filter.rs b/src/event_filter.rs index 8c60ce52..e080e27e 100644 --- a/src/event_filter.rs +++ b/src/event_filter.rs @@ -10,7 +10,7 @@ use notify_debouncer_full::DebouncedEvent; /// A set of filesystem events that `ghci` will need to respond to. Due to the way that `ghci` is, /// we need to divide these into a few different classes so that we can respond appropriately. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum FileEvent { /// Existing files that are modified, or new files that are created. /// diff --git a/src/ghci/manager.rs b/src/ghci/manager.rs index 82d6a02f..c6aa5fc1 100644 --- a/src/ghci/manager.rs +++ b/src/ghci/manager.rs @@ -7,6 +7,7 @@ use miette::miette; use miette::Context; use miette::IntoDiagnostic; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio::sync::Mutex; use tracing::instrument; @@ -15,9 +16,10 @@ use crate::shutdown::ShutdownHandle; use super::Ghci; use super::GhciOpts; +use super::GhciReloadKind; /// An event sent to [`Ghci`]. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum GhciEvent { /// Reload the `ghci` session. Reload { @@ -26,6 +28,23 @@ pub enum GhciEvent { }, } +impl GhciEvent { + /// When we interrupt an event to reload, add the file events together so that we don't lose + /// work. + fn merge(&mut self, other: GhciEvent) { + match (self, other) { + ( + GhciEvent::Reload { events }, + GhciEvent::Reload { + events: other_events, + }, + ) => { + events.extend(other_events); + } + } + } +} + /// Start the [`Ghci`] subsystem. #[instrument(skip_all, level = "debug")] pub async fn run_ghci( @@ -51,22 +70,38 @@ pub async fn run_ghci( } let ghci = Arc::new(Mutex::new(ghci)); + // The event to respond to. If we interrupt a reload, we may begin the loop with `Some(_)` in + // here. + let mut maybe_event = None; loop { - // Wait for filesystem events. - let event = tokio::select! { - _ = handle.on_shutdown_requested() => { - ghci.lock().await.stop().await.wrap_err("Failed to quit ghci")?; - break; - } - ret = receiver.recv() => { - ret.ok_or_else(|| miette!("ghci event channel closed"))? + let mut event = match maybe_event.take() { + Some(event) => event, + None => { + // If we don't already have an event to respond to, wait for filesystem events. + let event = tokio::select! { + _ = handle.on_shutdown_requested() => { + ghci.lock().await.stop().await.wrap_err("Failed to quit ghci")?; + break; + } + ret = receiver.recv() => { + ret.ok_or_else(|| miette!("ghci event channel closed"))? + } + }; + tracing::debug!(?event, "Received ghci event from watcher"); + event } }; - tracing::debug!(?event, "Received ghci event from watcher"); + // This channel notifies us what kind of reload is triggered, which we can use to inform + // our decision to interrupt the reload or not. + let (reload_sender, reload_receiver) = oneshot::channel(); // Dispatch the event. We spawn it into a new task so it can run in parallel to any // shutdown requests. - let mut task = Box::pin(tokio::task::spawn(dispatch(ghci.clone(), event))); + let mut task = Box::pin(tokio::task::spawn(dispatch( + ghci.clone(), + event.clone(), + reload_sender, + ))); tokio::select! { _ = handle.on_shutdown_requested() => { // Cancel any in-progress reloads. This releases the lock so we don't block here. @@ -74,6 +109,23 @@ pub async fn run_ghci( ghci.lock().await.stop().await.wrap_err("Failed to quit ghci")?; break; } + Some(new_event) = receiver.recv() => { + tracing::debug!(?new_event, "Received ghci event from watcher while reloading"); + if should_interrupt(reload_receiver).await { + // Merge the events together so we don't lose progress. + // Then, the next iteration of the loop will pick up the `maybe_event` value + // and respond immediately. + event.merge(new_event); + maybe_event = Some(event); + + // Cancel the in-progress reload. This releases the `ghci` lock to prevent a deadlock. + task.abort(); + + // Send a SIGINT to interrupt the reload. + // NB: This may take a couple seconds to register. + ghci.lock().await.send_sigint().await?; + } + } ret = &mut task => { ret.into_diagnostic()??; tracing::debug!("Finished dispatching ghci event"); @@ -84,12 +136,40 @@ pub async fn run_ghci( Ok(()) } -#[instrument(level = "debug", skip(ghci))] -async fn dispatch(ghci: Arc>, event: GhciEvent) -> miette::Result<()> { +#[instrument(level = "debug", skip(ghci, reload_sender))] +async fn dispatch( + ghci: Arc>, + event: GhciEvent, + reload_sender: oneshot::Sender, +) -> miette::Result<()> { match event { GhciEvent::Reload { events } => { - ghci.lock().await.reload(events).await?; + ghci.lock().await.reload(events, reload_sender).await?; } } Ok(()) } + +/// Should we interrupt a reload with a new event? +#[instrument(level = "debug", skip_all)] +async fn should_interrupt(reload_receiver: oneshot::Receiver) -> bool { + let reload_kind = match reload_receiver.await { + Ok(kind) => kind, + Err(err) => { + tracing::debug!("Failed to receive reload kind from ghci: {err}"); + return false; + } + }; + + match reload_kind { + GhciReloadKind::None | GhciReloadKind::Restart => { + // Nothing to do, wait for the task to finish. + tracing::debug!(?reload_kind, "Not interrupting reload"); + false + } + GhciReloadKind::Reload => { + tracing::debug!(?reload_kind, "Interrupting reload"); + true + } + } +} diff --git a/src/ghci/mod.rs b/src/ghci/mod.rs index 48e1563e..e740fb68 100644 --- a/src/ghci/mod.rs +++ b/src/ghci/mod.rs @@ -1,5 +1,6 @@ //! The core [`Ghci`] session struct. +use command_group::AsyncCommandGroup; use nix::sys::signal; use nix::sys::signal::Signal; use std::borrow::Borrow; @@ -11,6 +12,7 @@ use std::path::Path; use std::process::ExitStatus; use std::process::Stdio; use std::time::Instant; +use tokio::sync::oneshot; use tokio::task::JoinHandle; use aho_corasick::AhoCorasick; @@ -59,7 +61,6 @@ use crate::buffers::LINE_BUFFER_CAPACITY; use crate::cli::HookOpts; use crate::cli::Opts; use crate::clonable_command::ClonableCommand; -use crate::command_ext::SpawnExt; use crate::event_filter::FileEvent; use crate::format_bulleted_list; use crate::haskell_source_file::is_haskell_source_file; @@ -198,7 +199,10 @@ impl Ghci { .stdout(Stdio::piped()) .kill_on_drop(true); - command.spawn_group_without_inheriting_sigint()? + command + .group_spawn() + .into_diagnostic() + .wrap_err_with(|| format!("Failed to start {}", command.display()))? }; let process_group_id = Pid::from_raw( @@ -400,11 +404,19 @@ impl Ghci { /// Reload this `ghci` session to include the given modified and removed paths. /// /// This may fully restart the `ghci` process. + /// + /// NOTE: We interrupt reloads when applicable, so this function may be canceled and dropped at + /// any `await` point! #[instrument(skip_all, level = "debug")] - pub async fn reload(&mut self, events: BTreeSet) -> miette::Result<()> { + pub async fn reload( + &mut self, + events: BTreeSet, + kind_sender: oneshot::Sender, + ) -> miette::Result<()> { let actions = self.get_reload_actions(events).await?; + let _ = kind_sender.send(actions.kind()); - if !actions.needs_restart.is_empty() { + if actions.needs_restart() { tracing::info!( "Restarting ghci:\n{}", format_bulleted_list(&actions.needs_restart) @@ -677,11 +689,16 @@ impl Ghci { NormalPath::new(path, &self.search_paths.cwd) } - #[instrument(skip_all, level = "trace")] + #[instrument(skip_all, level = "debug")] async fn send_sigint(&mut self) -> miette::Result<()> { + let start_instant = Instant::now(); signal::killpg(self.process_group_id, Signal::SIGINT) .into_diagnostic() .wrap_err("Failed to send `Ctrl-C` (`SIGINT`) to ghci session")?; + self.stdout + .prompt(crate::incremental_reader::FindAt::Anywhere) + .await?; + tracing::debug!("Interrupted ghci in {:.2?}", start_instant.elapsed()); Ok(()) } @@ -764,4 +781,31 @@ impl ReloadActions { fn needs_add_or_reload(&self) -> bool { !self.needs_add.is_empty() || !self.needs_reload.is_empty() } + + /// Is a session restart needed? + fn needs_restart(&self) -> bool { + !self.needs_restart.is_empty() + } + + /// Get the kind of reload we'll perform. + fn kind(&self) -> GhciReloadKind { + if self.needs_restart() { + GhciReloadKind::Restart + } else if self.needs_add_or_reload() { + GhciReloadKind::Reload + } else { + GhciReloadKind::None + } + } +} + +/// How a [`Ghci`] session responds to a reload event. +#[derive(Debug)] +pub enum GhciReloadKind { + /// Noop. No actions needed. + None, + /// Reload and/or add modules. Can be interrupted. + Reload, + /// Restart the whole session. Cannot be interrupted. + Restart, }