Skip to content

Commit

Permalink
Interrupt reloads when new files change
Browse files Browse the repository at this point in the history
This will give a snappier UX in general.

Note that now, many hooks can be canceled or mismatched (e.g. the
before-reload hooks can run but the after-reload hooks get canceled).
  • Loading branch information
9999years committed Nov 21, 2023
1 parent a02b0d4 commit 6961855
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 67 deletions.
47 changes: 0 additions & 47 deletions src/command_ext.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
use std::process::Command as StdCommand;

use command_group::AsyncCommandGroup;
use command_group::AsyncGroupChild;
use miette::Context;
use miette::IntoDiagnostic;
use nix::sys::signal::pthread_sigmask;
use nix::sys::signal::SigSet;
use nix::sys::signal::SigmaskHow;
use nix::sys::signal::Signal;
use tokio::process::Command;

/// Extension trait for commands.
Expand Down Expand Up @@ -35,42 +27,3 @@ impl CommandExt for StdCommand {
shell_words::join(tokens)
}
}

pub trait SpawnExt {
/// The type of spawned processes.
type Child;

/// Spawn the command, but do not inherit `SIGINT` signals from the calling process.
fn spawn_group_without_inheriting_sigint(&mut self) -> miette::Result<Self::Child>;
}

impl SpawnExt for Command {
type Child = AsyncGroupChild;

fn spawn_group_without_inheriting_sigint(&mut self) -> miette::Result<Self::Child> {
spawn_without_inheriting_sigint(|| {
self.group_spawn()
.into_diagnostic()
.wrap_err_with(|| format!("Failed to start `{}`", self.display()))
})
}
}

fn spawn_without_inheriting_sigint<T>(
spawn: impl FnOnce() -> miette::Result<T>,
) -> miette::Result<T> {
// See: https://github.com/rust-lang/rust/pull/100737#issuecomment-1445257548
let mut old_signal_mask = SigSet::empty();
pthread_sigmask(
SigmaskHow::SIG_SETMASK,
Some(&SigSet::from_iter(std::iter::once(Signal::SIGINT))),
Some(&mut old_signal_mask),
)
.into_diagnostic()?;

let result = spawn();

pthread_sigmask(SigmaskHow::SIG_SETMASK, Some(&old_signal_mask), None).into_diagnostic()?;

result
}
2 changes: 1 addition & 1 deletion src/event_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use notify_debouncer_full::DebouncedEvent;

/// A set of filesystem events that `ghci` will need to respond to. Due to the way that `ghci` is,
/// we need to divide these into a few different classes so that we can respond appropriately.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum FileEvent {
/// Existing files that are modified, or new files that are created.
///
Expand Down
108 changes: 94 additions & 14 deletions src/ghci/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use miette::miette;
use miette::Context;
use miette::IntoDiagnostic;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tracing::instrument;

Expand All @@ -15,9 +16,10 @@ use crate::shutdown::ShutdownHandle;

use super::Ghci;
use super::GhciOpts;
use super::GhciReloadKind;

/// An event sent to [`Ghci`].
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum GhciEvent {
/// Reload the `ghci` session.
Reload {
Expand All @@ -26,6 +28,23 @@ pub enum GhciEvent {
},
}

impl GhciEvent {
/// When we interrupt an event to reload, add the file events together so that we don't lose
/// work.
fn merge(&mut self, other: GhciEvent) {
match (self, other) {
(
GhciEvent::Reload { events },
GhciEvent::Reload {
events: other_events,
},
) => {
events.extend(other_events);
}
}
}
}

/// Start the [`Ghci`] subsystem.
#[instrument(skip_all, level = "debug")]
pub async fn run_ghci(
Expand All @@ -51,29 +70,62 @@ pub async fn run_ghci(
}

let ghci = Arc::new(Mutex::new(ghci));
// The event to respond to. If we interrupt a reload, we may begin the loop with `Some(_)` in
// here.
let mut maybe_event = None;
loop {
// Wait for filesystem events.
let event = tokio::select! {
_ = handle.on_shutdown_requested() => {
ghci.lock().await.stop().await.wrap_err("Failed to quit ghci")?;
break;
}
ret = receiver.recv() => {
ret.ok_or_else(|| miette!("ghci event channel closed"))?
let mut event = match maybe_event.take() {
Some(event) => event,
None => {
// If we don't already have an event to respond to, wait for filesystem events.
let event = tokio::select! {
_ = handle.on_shutdown_requested() => {
ghci.lock().await.stop().await.wrap_err("Failed to quit ghci")?;
break;
}
ret = receiver.recv() => {
ret.ok_or_else(|| miette!("ghci event channel closed"))?
}
};
tracing::debug!(?event, "Received ghci event from watcher");
event
}
};
tracing::debug!(?event, "Received ghci event from watcher");

// This channel notifies us what kind of reload is triggered, which we can use to inform
// our decision to interrupt the reload or not.
let (reload_sender, reload_receiver) = oneshot::channel();
// Dispatch the event. We spawn it into a new task so it can run in parallel to any
// shutdown requests.
let mut task = Box::pin(tokio::task::spawn(dispatch(ghci.clone(), event)));
let mut task = Box::pin(tokio::task::spawn(dispatch(
ghci.clone(),
event.clone(),
reload_sender,
)));
tokio::select! {
_ = handle.on_shutdown_requested() => {
// Cancel any in-progress reloads. This releases the lock so we don't block here.
task.abort();
ghci.lock().await.stop().await.wrap_err("Failed to quit ghci")?;
break;
}
Some(new_event) = receiver.recv() => {
tracing::debug!(?new_event, "Received ghci event from watcher while reloading");
if should_interrupt(reload_receiver).await {
// Merge the events together so we don't lose progress.
// Then, the next iteration of the loop will pick up the `maybe_event` value
// and respond immediately.
event.merge(new_event);
maybe_event = Some(event);

// Cancel the in-progress reload. This releases the `ghci` lock to prevent a deadlock.
task.abort();

// Send a SIGINT to interrupt the reload.
// NB: This may take a couple seconds to register.
ghci.lock().await.send_sigint().await?;
}
}
ret = &mut task => {
ret.into_diagnostic()??;
tracing::debug!("Finished dispatching ghci event");
Expand All @@ -84,12 +136,40 @@ pub async fn run_ghci(
Ok(())
}

#[instrument(level = "debug", skip(ghci))]
async fn dispatch(ghci: Arc<Mutex<Ghci>>, event: GhciEvent) -> miette::Result<()> {
#[instrument(level = "debug", skip(ghci, reload_sender))]
async fn dispatch(
ghci: Arc<Mutex<Ghci>>,
event: GhciEvent,
reload_sender: oneshot::Sender<GhciReloadKind>,
) -> miette::Result<()> {
match event {
GhciEvent::Reload { events } => {
ghci.lock().await.reload(events).await?;
ghci.lock().await.reload(events, reload_sender).await?;
}
}
Ok(())
}

/// Should we interrupt a reload with a new event?
#[instrument(level = "debug", skip_all)]
async fn should_interrupt(reload_receiver: oneshot::Receiver<GhciReloadKind>) -> bool {
let reload_kind = match reload_receiver.await {
Ok(kind) => kind,
Err(err) => {
tracing::debug!("Failed to receive reload kind from ghci: {err}");
return false;
}
};

match reload_kind {
GhciReloadKind::None | GhciReloadKind::Restart => {
// Nothing to do, wait for the task to finish.
tracing::debug!(?reload_kind, "Not interrupting reload");
false
}
GhciReloadKind::Reload => {
tracing::debug!(?reload_kind, "Interrupting reload");
true
}
}
}
54 changes: 49 additions & 5 deletions src/ghci/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! The core [`Ghci`] session struct.
use command_group::AsyncCommandGroup;
use nix::sys::signal;
use nix::sys::signal::Signal;
use std::borrow::Borrow;
Expand All @@ -11,6 +12,7 @@ use std::path::Path;
use std::process::ExitStatus;
use std::process::Stdio;
use std::time::Instant;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

use aho_corasick::AhoCorasick;
Expand Down Expand Up @@ -59,7 +61,6 @@ use crate::buffers::LINE_BUFFER_CAPACITY;
use crate::cli::HookOpts;
use crate::cli::Opts;
use crate::clonable_command::ClonableCommand;
use crate::command_ext::SpawnExt;
use crate::event_filter::FileEvent;
use crate::format_bulleted_list;
use crate::haskell_source_file::is_haskell_source_file;
Expand Down Expand Up @@ -198,7 +199,10 @@ impl Ghci {
.stdout(Stdio::piped())
.kill_on_drop(true);

command.spawn_group_without_inheriting_sigint()?
command
.group_spawn()
.into_diagnostic()
.wrap_err_with(|| format!("Failed to start {}", command.display()))?
};

let process_group_id = Pid::from_raw(
Expand Down Expand Up @@ -400,11 +404,19 @@ impl Ghci {
/// Reload this `ghci` session to include the given modified and removed paths.
///
/// This may fully restart the `ghci` process.
///
/// NOTE: We interrupt reloads when applicable, so this function may be canceled and dropped at
/// any `await` point!
#[instrument(skip_all, level = "debug")]
pub async fn reload(&mut self, events: BTreeSet<FileEvent>) -> miette::Result<()> {
pub async fn reload(
&mut self,
events: BTreeSet<FileEvent>,
kind_sender: oneshot::Sender<GhciReloadKind>,
) -> miette::Result<()> {
let actions = self.get_reload_actions(events).await?;
let _ = kind_sender.send(actions.kind());

if !actions.needs_restart.is_empty() {
if actions.needs_restart() {
tracing::info!(
"Restarting ghci:\n{}",
format_bulleted_list(&actions.needs_restart)
Expand Down Expand Up @@ -677,11 +689,16 @@ impl Ghci {
NormalPath::new(path, &self.search_paths.cwd)
}

#[instrument(skip_all, level = "trace")]
#[instrument(skip_all, level = "debug")]
async fn send_sigint(&mut self) -> miette::Result<()> {
let start_instant = Instant::now();
signal::killpg(self.process_group_id, Signal::SIGINT)
.into_diagnostic()
.wrap_err("Failed to send `Ctrl-C` (`SIGINT`) to ghci session")?;
self.stdout
.prompt(crate::incremental_reader::FindAt::Anywhere)
.await?;
tracing::debug!("Interrupted ghci in {:.2?}", start_instant.elapsed());
Ok(())
}

Expand Down Expand Up @@ -764,4 +781,31 @@ impl ReloadActions {
fn needs_add_or_reload(&self) -> bool {
!self.needs_add.is_empty() || !self.needs_reload.is_empty()
}

/// Is a session restart needed?
fn needs_restart(&self) -> bool {
!self.needs_restart.is_empty()
}

/// Get the kind of reload we'll perform.
fn kind(&self) -> GhciReloadKind {
if self.needs_restart() {
GhciReloadKind::Restart
} else if self.needs_add_or_reload() {
GhciReloadKind::Reload
} else {
GhciReloadKind::None
}
}
}

/// How a [`Ghci`] session responds to a reload event.
#[derive(Debug)]
pub enum GhciReloadKind {
/// Noop. No actions needed.
None,
/// Reload and/or add modules. Can be interrupted.
Reload,
/// Restart the whole session. Cannot be interrupted.
Restart,
}

0 comments on commit 6961855

Please sign in to comment.