Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interrupt reloads when new files change #158

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 0 additions & 47 deletions src/command_ext.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
use std::process::Command as StdCommand;

use command_group::AsyncCommandGroup;
use command_group::AsyncGroupChild;
use miette::Context;
use miette::IntoDiagnostic;
use nix::sys::signal::pthread_sigmask;
use nix::sys::signal::SigSet;
use nix::sys::signal::SigmaskHow;
use nix::sys::signal::Signal;
use tokio::process::Command;

/// Extension trait for commands.
Expand Down Expand Up @@ -35,42 +27,3 @@ impl CommandExt for StdCommand {
shell_words::join(tokens)
}
}

pub trait SpawnExt {
/// The type of spawned processes.
type Child;

/// Spawn the command, but do not inherit `SIGINT` signals from the calling process.
fn spawn_group_without_inheriting_sigint(&mut self) -> miette::Result<Self::Child>;
}

impl SpawnExt for Command {
type Child = AsyncGroupChild;

fn spawn_group_without_inheriting_sigint(&mut self) -> miette::Result<Self::Child> {
spawn_without_inheriting_sigint(|| {
self.group_spawn()
.into_diagnostic()
.wrap_err_with(|| format!("Failed to start `{}`", self.display()))
})
}
}

fn spawn_without_inheriting_sigint<T>(
spawn: impl FnOnce() -> miette::Result<T>,
) -> miette::Result<T> {
// See: https://github.com/rust-lang/rust/pull/100737#issuecomment-1445257548
let mut old_signal_mask = SigSet::empty();
pthread_sigmask(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that this doesn't just stop ghci from inheriting SIGINT from ghciwatch, it also stops ghci from receiving SIGINT whatsoever.

This is a little messier, I guess — when users press Ctrl-C the underlying ghci session will see that as well — but then ghciwatch will shut down, so it doesn't matter much in the end.

SigmaskHow::SIG_SETMASK,
Some(&SigSet::from_iter(std::iter::once(Signal::SIGINT))),
Some(&mut old_signal_mask),
)
.into_diagnostic()?;

let result = spawn();

pthread_sigmask(SigmaskHow::SIG_SETMASK, Some(&old_signal_mask), None).into_diagnostic()?;

result
}
2 changes: 1 addition & 1 deletion src/event_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use notify_debouncer_full::DebouncedEvent;

/// A set of filesystem events that `ghci` will need to respond to. Due to the way that `ghci` is,
/// we need to divide these into a few different classes so that we can respond appropriately.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum FileEvent {
/// Existing files that are modified, or new files that are created.
///
Expand Down
108 changes: 94 additions & 14 deletions src/ghci/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use miette::miette;
use miette::Context;
use miette::IntoDiagnostic;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tracing::instrument;

Expand All @@ -15,9 +16,10 @@ use crate::shutdown::ShutdownHandle;

use super::Ghci;
use super::GhciOpts;
use super::GhciReloadKind;

/// An event sent to [`Ghci`].
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum GhciEvent {
/// Reload the `ghci` session.
Reload {
Expand All @@ -26,6 +28,23 @@ pub enum GhciEvent {
},
}

impl GhciEvent {
/// When we interrupt an event to reload, add the file events together so that we don't lose
/// work.
fn merge(&mut self, other: GhciEvent) {
match (self, other) {
(
GhciEvent::Reload { events },
GhciEvent::Reload {
events: other_events,
},
) => {
events.extend(other_events);
}
}
}
}

/// Start the [`Ghci`] subsystem.
#[instrument(skip_all, level = "debug")]
pub async fn run_ghci(
Expand All @@ -51,29 +70,62 @@ pub async fn run_ghci(
}

let ghci = Arc::new(Mutex::new(ghci));
// The event to respond to. If we interrupt a reload, we may begin the loop with `Some(_)` in
// here.
let mut maybe_event = None;
loop {
// Wait for filesystem events.
let event = tokio::select! {
_ = handle.on_shutdown_requested() => {
ghci.lock().await.stop().await.wrap_err("Failed to quit ghci")?;
break;
}
ret = receiver.recv() => {
ret.ok_or_else(|| miette!("ghci event channel closed"))?
let mut event = match maybe_event.take() {
Some(event) => event,
None => {
// If we don't already have an event to respond to, wait for filesystem events.
let event = tokio::select! {
_ = handle.on_shutdown_requested() => {
ghci.lock().await.stop().await.wrap_err("Failed to quit ghci")?;
break;
}
ret = receiver.recv() => {
ret.ok_or_else(|| miette!("ghci event channel closed"))?
}
};
tracing::debug!(?event, "Received ghci event from watcher");
event
}
};
tracing::debug!(?event, "Received ghci event from watcher");

// This channel notifies us what kind of reload is triggered, which we can use to inform
// our decision to interrupt the reload or not.
let (reload_sender, reload_receiver) = oneshot::channel();
// Dispatch the event. We spawn it into a new task so it can run in parallel to any
// shutdown requests.
let mut task = Box::pin(tokio::task::spawn(dispatch(ghci.clone(), event)));
let mut task = Box::pin(tokio::task::spawn(dispatch(
ghci.clone(),
event.clone(),
reload_sender,
)));
tokio::select! {
_ = handle.on_shutdown_requested() => {
// Cancel any in-progress reloads. This releases the lock so we don't block here.
task.abort();
ghci.lock().await.stop().await.wrap_err("Failed to quit ghci")?;
break;
}
Some(new_event) = receiver.recv() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth explicitly matching on the Reload constructor of GhciEvent so that if you later add more constructors this will become a non-exhaustive pattern match.

tracing::debug!(?new_event, "Received ghci event from watcher while reloading");
if should_interrupt(reload_receiver).await {
// Merge the events together so we don't lose progress.
// Then, the next iteration of the loop will pick up the `maybe_event` value
// and respond immediately.
event.merge(new_event);
maybe_event = Some(event);

// Cancel the in-progress reload. This releases the `ghci` lock to prevent a deadlock.
task.abort();

// Send a SIGINT to interrupt the reload.
// NB: This may take a couple seconds to register.
ghci.lock().await.send_sigint().await?;
}
}
ret = &mut task => {
ret.into_diagnostic()??;
tracing::debug!("Finished dispatching ghci event");
Expand All @@ -84,12 +136,40 @@ pub async fn run_ghci(
Ok(())
}

#[instrument(level = "debug", skip(ghci))]
async fn dispatch(ghci: Arc<Mutex<Ghci>>, event: GhciEvent) -> miette::Result<()> {
#[instrument(level = "debug", skip(ghci, reload_sender))]
async fn dispatch(
ghci: Arc<Mutex<Ghci>>,
event: GhciEvent,
reload_sender: oneshot::Sender<GhciReloadKind>,
) -> miette::Result<()> {
match event {
GhciEvent::Reload { events } => {
ghci.lock().await.reload(events).await?;
ghci.lock().await.reload(events, reload_sender).await?;
}
}
Ok(())
}

/// Should we interrupt a reload with a new event?
#[instrument(level = "debug", skip_all)]
async fn should_interrupt(reload_receiver: oneshot::Receiver<GhciReloadKind>) -> bool {
let reload_kind = match reload_receiver.await {
Ok(kind) => kind,
Err(err) => {
tracing::debug!("Failed to receive reload kind from ghci: {err}");
return false;
}
};

match reload_kind {
GhciReloadKind::None | GhciReloadKind::Restart => {
// Nothing to do, wait for the task to finish.
tracing::debug!(?reload_kind, "Not interrupting reload");
false
}
GhciReloadKind::Reload => {
tracing::debug!(?reload_kind, "Interrupting reload");
true
}
}
}
54 changes: 49 additions & 5 deletions src/ghci/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! The core [`Ghci`] session struct.

use command_group::AsyncCommandGroup;
use nix::sys::signal;
use nix::sys::signal::Signal;
use std::borrow::Borrow;
Expand All @@ -11,6 +12,7 @@ use std::path::Path;
use std::process::ExitStatus;
use std::process::Stdio;
use std::time::Instant;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

use aho_corasick::AhoCorasick;
Expand Down Expand Up @@ -59,7 +61,6 @@ use crate::buffers::LINE_BUFFER_CAPACITY;
use crate::cli::HookOpts;
use crate::cli::Opts;
use crate::clonable_command::ClonableCommand;
use crate::command_ext::SpawnExt;
use crate::event_filter::FileEvent;
use crate::format_bulleted_list;
use crate::haskell_source_file::is_haskell_source_file;
Expand Down Expand Up @@ -198,7 +199,10 @@ impl Ghci {
.stdout(Stdio::piped())
.kill_on_drop(true);

command.spawn_group_without_inheriting_sigint()?
command
.group_spawn()
.into_diagnostic()
.wrap_err_with(|| format!("Failed to start {}", command.display()))?
};

let process_group_id = Pid::from_raw(
Expand Down Expand Up @@ -400,11 +404,19 @@ impl Ghci {
/// Reload this `ghci` session to include the given modified and removed paths.
///
/// This may fully restart the `ghci` process.
///
/// NOTE: We interrupt reloads when applicable, so this function may be canceled and dropped at
/// any `await` point!
#[instrument(skip_all, level = "debug")]
pub async fn reload(&mut self, events: BTreeSet<FileEvent>) -> miette::Result<()> {
pub async fn reload(
&mut self,
events: BTreeSet<FileEvent>,
kind_sender: oneshot::Sender<GhciReloadKind>,
) -> miette::Result<()> {
let actions = self.get_reload_actions(events).await?;
let _ = kind_sender.send(actions.kind());

if !actions.needs_restart.is_empty() {
if actions.needs_restart() {
tracing::info!(
"Restarting ghci:\n{}",
format_bulleted_list(&actions.needs_restart)
Expand Down Expand Up @@ -677,11 +689,16 @@ impl Ghci {
NormalPath::new(path, &self.search_paths.cwd)
}

#[instrument(skip_all, level = "trace")]
#[instrument(skip_all, level = "debug")]
async fn send_sigint(&mut self) -> miette::Result<()> {
let start_instant = Instant::now();
signal::killpg(self.process_group_id, Signal::SIGINT)
.into_diagnostic()
.wrap_err("Failed to send `Ctrl-C` (`SIGINT`) to ghci session")?;
self.stdout
.prompt(crate::incremental_reader::FindAt::Anywhere)
.await?;
tracing::debug!("Interrupted ghci in {:.2?}", start_instant.elapsed());
Ok(())
}

Expand Down Expand Up @@ -764,4 +781,31 @@ impl ReloadActions {
fn needs_add_or_reload(&self) -> bool {
!self.needs_add.is_empty() || !self.needs_reload.is_empty()
}

/// Is a session restart needed?
fn needs_restart(&self) -> bool {
!self.needs_restart.is_empty()
}

/// Get the kind of reload we'll perform.
fn kind(&self) -> GhciReloadKind {
if self.needs_restart() {
GhciReloadKind::Restart
} else if self.needs_add_or_reload() {
GhciReloadKind::Reload
} else {
GhciReloadKind::None
}
}
}

/// How a [`Ghci`] session responds to a reload event.
#[derive(Debug)]
pub enum GhciReloadKind {
/// Noop. No actions needed.
None,
/// Reload and/or add modules. Can be interrupted.
Reload,
/// Restart the whole session. Cannot be interrupted.
Restart,
}