Skip to content

Commit

Permalink
feat(make shutdown duration configurable): make shutdown duration con…
Browse files Browse the repository at this point in the history
…figurable
  • Loading branch information
DominicBurkart committed May 24, 2023
1 parent 897e45d commit a89c3e4
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 42 deletions.
34 changes: 19 additions & 15 deletions lib/vector-common/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl SourceShutdownCoordinator {
///
/// Panics if this coordinator has had its triggers removed (ie
/// has been taken over with `Self::takeover_source`).
pub fn shutdown_all(self, deadline: Instant) -> impl Future<Output = ()> {
pub fn shutdown_all(self, deadline: Option<Instant>) -> impl Future<Output = ()> {
let mut complete_futures = Vec::new();

let shutdown_begun_triggers = self.shutdown_begun_triggers;
Expand Down Expand Up @@ -275,7 +275,7 @@ impl SourceShutdownCoordinator {
shutdown_complete_tripwire,
shutdown_force_trigger,
id.clone(),
deadline,
Some(deadline),
)
}

Expand All @@ -297,23 +297,27 @@ impl SourceShutdownCoordinator {
shutdown_complete_tripwire: Tripwire,
shutdown_force_trigger: Trigger,
id: ComponentKey,
deadline: Instant,
deadline: Option<Instant>,
) -> impl Future<Output = bool> {
async move {
// Call `shutdown_force_trigger.disable()` on drop.
let shutdown_force_trigger = DisabledTrigger::new(shutdown_force_trigger);

let fut = shutdown_complete_tripwire.then(tripwire_handler);
if timeout_at(deadline, fut).await.is_ok() {
shutdown_force_trigger.into_inner().disable();
true
if deadline.is_some() {
// Call `shutdown_force_trigger.disable()` on drop.
let shutdown_force_trigger = DisabledTrigger::new(shutdown_force_trigger);
if timeout_at(deadline.unwrap(), fut).await.is_ok() {
shutdown_force_trigger.into_inner().disable();
true
} else {
error!(
"Source '{}' failed to shutdown before deadline. Forcing shutdown.",
id,
);
shutdown_force_trigger.into_inner().cancel();
false
}
} else {
error!(
"Source '{}' failed to shutdown before deadline. Forcing shutdown.",
id,
);
shutdown_force_trigger.into_inner().cancel();
false
fut.await;
true
}
}
.boxed()
Expand Down
6 changes: 6 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ pub struct RootOpts {
default_value = "5000"
)]
pub allocation_tracing_reporting_interval_ms: u64,

/// Set the duration in seconds to wait for graceful shutdown after SIGINT or SIGTERM are received.
/// After the duration has passed, Vector will force shutdown. Default value is 60 seconds. If set
/// to -1, Vector will never force shutdown.
#[arg(long, default_value = "60", env = "VECTOR_GRACEFUL_SHUTDOWN_DURATION", value_parser = clap::value_parser!(i64).range(-1..))]
pub graceful_shutdown_duration: i64,
}

impl RootOpts {
Expand Down
78 changes: 51 additions & 27 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use vector_common::trigger::DisabledTrigger;

use super::{TapOutput, TapResource};
use crate::{
cli::Opts,
config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
event::EventArray,
shutdown::SourceShutdownCoordinator,
Expand Down Expand Up @@ -45,6 +46,7 @@ pub struct RunningTopology {
abort_tx: mpsc::UnboundedSender<()>,
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
}

impl RunningTopology {
Expand All @@ -62,6 +64,22 @@ impl RunningTopology {
abort_tx,
watch: watch::channel(TapResource::default()),
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: {
if let Ok(opts) = Opts::get_matches().map_err(|error| {
// Printing to stdout/err can itself fail; ignore it.
_ = error.print();
error!("could not access flags while instantiating RunningTopology");
()
}) {
match opts.root.graceful_shutdown_duration {
-1 => None,
seconds => Some(Duration::from_secs(seconds as u64)) // clap validator makes sure value is >= -1
}
} else {
// TODO should this be unreachable!() since the opts have already been validated?
Some(Duration::from_secs(60))
}
}
}
}

Expand Down Expand Up @@ -120,30 +138,34 @@ impl RunningTopology {
check_handles.entry(key).or_default().push(task);
}

// If we reach this, we will forcefully shutdown the sources.
let deadline = Instant::now() + Duration::from_secs(60);

// If we reach the deadline, this future will print out which components
// won't gracefully shutdown since we will start to forcefully shutdown
// the sources.
let mut check_handles2 = check_handles.clone();
let timeout = async move {
sleep_until(deadline).await;
// Remove all tasks that have shutdown.
check_handles2.retain(|_key, handles| {
retain(handles, |handle| handle.peek().is_none());
!handles.is_empty()
});
let remaining_components = check_handles2
.keys()
.map(|item| item.to_string())
.collect::<Vec<_>>()
.join(", ");
// If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
let deadline = self.graceful_shutdown_duration.map(|grace_period| Instant::now() + grace_period);

error!(
components = ?remaining_components,
"Failed to gracefully shut down in time. Killing components."
);
let timeout = if deadline.is_some() {
// If we reach the deadline, this future will print out which components
// won't gracefully shutdown since we will start to forcefully shutdown
// the sources.
let mut check_handles2 = check_handles.clone();
Box::pin(async move {
sleep_until(deadline.unwrap()).await;
// Remove all tasks that have shutdown.
check_handles2.retain(|_key, handles| {
retain(handles, |handle| handle.peek().is_none());
!handles.is_empty()
});
let remaining_components = check_handles2
.keys()
.map(|item| item.to_string())
.collect::<Vec<_>>()
.join(", ");

error!(
components = ?remaining_components,
"Failed to gracefully shut down in time. Killing components."
);
}) as future::BoxFuture<'static, ()>
} else {
Box::pin(future::pending()) as future::BoxFuture<'static, ()>
};

// Reports in intervals which components are still running.
Expand All @@ -163,10 +185,12 @@ impl RunningTopology {
.collect::<Vec<_>>()
.join(", ");

let time_remaining = match deadline.checked_duration_since(Instant::now()) {
Some(remaining) => format!("{} seconds left", remaining.as_secs()),
None => "overdue".to_string(),
};
let time_remaining = deadline.map(|d|
match d.checked_duration_since(Instant::now()) {
Some(remaining) => format!("{} seconds left", remaining.as_secs()),
None => "overdue".to_string(),
}
).unwrap_or("no time limit".to_string());

info!(
remaining_components = ?remaining_components,
Expand Down

0 comments on commit a89c3e4

Please sign in to comment.