Skip to content

Commit

Permalink
feat(configurable shutdown duration): make shutdown duration configur…
Browse files Browse the repository at this point in the history
…able (vectordotdev#17479)

We want to make the graceful shutdown period configurable instead of
hardcoding it to sixty seconds.

Issues:
vectordotdev#9042 Remove 60s hard
cutoff period for graceful shutdowns
vectordotdev#12831 Want to adjust
graceful shutdown time

This is my first PR in vector, not sure if this is the correct approach:
- are the ergonomics (-1 for no timeout, 0+ for timeout durations) good?
- any test recommendations beyond manual testing?

---------

Co-authored-by: Bruce Guenter <[email protected]>
  • Loading branch information
DominicBurkart and bruceg authored Jun 1, 2023
1 parent 7a4f1f7 commit 23ed0e3
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 53 deletions.
34 changes: 19 additions & 15 deletions lib/vector-common/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl SourceShutdownCoordinator {
///
/// Panics if this coordinator has had its triggers removed (ie
/// has been taken over with `Self::takeover_source`).
pub fn shutdown_all(self, deadline: Instant) -> impl Future<Output = ()> {
pub fn shutdown_all(self, deadline: Option<Instant>) -> impl Future<Output = ()> {
let mut complete_futures = Vec::new();

let shutdown_begun_triggers = self.shutdown_begun_triggers;
Expand Down Expand Up @@ -275,7 +275,7 @@ impl SourceShutdownCoordinator {
shutdown_complete_tripwire,
shutdown_force_trigger,
id.clone(),
deadline,
Some(deadline),
)
}

Expand All @@ -297,23 +297,27 @@ impl SourceShutdownCoordinator {
shutdown_complete_tripwire: Tripwire,
shutdown_force_trigger: Trigger,
id: ComponentKey,
deadline: Instant,
deadline: Option<Instant>,
) -> impl Future<Output = bool> {
async move {
// Call `shutdown_force_trigger.disable()` on drop.
let shutdown_force_trigger = DisabledTrigger::new(shutdown_force_trigger);

let fut = shutdown_complete_tripwire.then(tripwire_handler);
if timeout_at(deadline, fut).await.is_ok() {
shutdown_force_trigger.into_inner().disable();
true
if let Some(deadline) = deadline {
// Call `shutdown_force_trigger.disable()` on drop.
let shutdown_force_trigger = DisabledTrigger::new(shutdown_force_trigger);
if timeout_at(deadline, fut).await.is_ok() {
shutdown_force_trigger.into_inner().disable();
true
} else {
error!(
"Source '{}' failed to shutdown before deadline. Forcing shutdown.",
id,
);
shutdown_force_trigger.into_inner().cancel();
false
}
} else {
error!(
"Source '{}' failed to shutdown before deadline. Forcing shutdown.",
id,
);
shutdown_force_trigger.into_inner().cancel();
false
fut.await;
true
}
}
.boxed()
Expand Down
8 changes: 7 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(missing_docs)]
use std::{collections::HashMap, num::NonZeroUsize, path::PathBuf};
use std::{collections::HashMap, num::NonZeroUsize, path::PathBuf, time::Duration};

use exitcode::ExitCode;
use futures::StreamExt;
Expand Down Expand Up @@ -62,10 +62,14 @@ impl ApplicationConfig {
) -> Result<Self, ExitCode> {
let config_paths = opts.config_paths_with_formats();

let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
.then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));

let config = load_configs(
&config_paths,
opts.watch_config,
opts.require_healthy,
graceful_shutdown_duration,
signal_handler,
)
.await?;
Expand Down Expand Up @@ -410,6 +414,7 @@ pub async fn load_configs(
config_paths: &[ConfigPath],
watch_config: bool,
require_healthy: Option<bool>,
graceful_shutdown_duration: Option<Duration>,
signal_handler: &mut SignalHandler,
) -> Result<Config, ExitCode> {
let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
Expand Down Expand Up @@ -440,6 +445,7 @@ pub async fn load_configs(
info!("Health checks are disabled.");
}
config.healthchecks.set_require_healthy(require_healthy);
config.graceful_shutdown_duration = graceful_shutdown_duration;

Ok(config)
}
Expand Down
24 changes: 23 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(missing_docs)]
use std::path::PathBuf;
use std::{num::NonZeroU64, path::PathBuf};

use clap::{ArgAction, CommandFactory, FromArgMatches, Parser};

Expand Down Expand Up @@ -159,6 +159,28 @@ pub struct RootOpts {
)]
pub internal_log_rate_limit: u64,

/// Set the duration in seconds to wait for graceful shutdown after SIGINT or SIGTERM are
/// received. After the duration has passed, Vector will force shutdown. To never force
/// shutdown, use `--no-graceful-shutdown-limit`.
#[arg(
long,
default_value = "60",
env = "VECTOR_GRACEFUL_SHUTDOWN_LIMIT_SECS",
group = "graceful-shutdown-limit"
)]
pub graceful_shutdown_limit_secs: NonZeroU64,

/// Never time out while waiting for graceful shutdown after SIGINT or SIGTERM received.
/// This is useful when you would like for Vector to attempt to send data until terminated
/// by a SIGKILL. Overrides/cannot be set with `--graceful-shutdown-limit-secs`.
#[arg(
long,
default_value = "false",
env = "VECTOR_NO_GRACEFUL_SHUTDOWN_LIMIT",
group = "graceful-shutdown-limit"
)]
pub no_graceful_shutdown_limit: bool,

/// Set runtime allocation tracing
#[cfg(feature = "allocation-tracing")]
#[arg(long, env = "ALLOCATION_TRACING", default_value = "false")]
Expand Down
11 changes: 10 additions & 1 deletion src/config/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(feature = "enterprise")]
use std::collections::BTreeMap;
use std::path::Path;
use std::{path::Path, time::Duration};

use indexmap::IndexMap;
#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -78,6 +78,13 @@ pub struct ConfigBuilder {
/// All configured secrets backends.
#[serde(default)]
pub secret: IndexMap<ComponentKey, SecretBackends>,

/// The duration in seconds to wait for graceful shutdown after SIGINT or SIGTERM are received.
/// After the duration has passed, Vector will force shutdown. Default value is 60 seconds. This
/// value can be set using a [cli arg](crate::cli::RootOpts::graceful_shutdown_limit_secs).
#[serde(default, skip)]
#[doc(hidden)]
pub graceful_shutdown_duration: Option<Duration>,
}

#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -195,6 +202,7 @@ impl From<Config> for ConfigBuilder {
transforms,
tests,
secret,
graceful_shutdown_duration,
hash: _,
} = config;

Expand Down Expand Up @@ -225,6 +233,7 @@ impl From<Config> for ConfigBuilder {
provider: None,
tests,
secret,
graceful_shutdown_duration,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/config/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
tests,
provider: _,
secret,
graceful_shutdown_duration,
} = builder;

let graph = match Graph::new(&sources, &transforms, &sinks, schema) {
Expand Down Expand Up @@ -111,6 +112,7 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
transforms,
tests,
secret,
graceful_shutdown_duration,
};

config.propagate_acknowledgements()?;
Expand Down
2 changes: 2 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
hash::Hash,
net::SocketAddr,
path::PathBuf,
time::Duration,
};

use indexmap::IndexMap;
Expand Down Expand Up @@ -105,6 +106,7 @@ pub struct Config {
pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter>,
tests: Vec<TestDefinition>,
secret: IndexMap<ComponentKey, SecretBackends>,
pub graceful_shutdown_duration: Option<Duration>,
}

impl Config {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ mod tests {

sleep(Duration::from_millis(100)).await;
shutdown
.shutdown_all(Instant::now() + Duration::from_secs(1))
.shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
.await;

timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
Expand Down
4 changes: 2 additions & 2 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ mod test {
// everything that was in up without having to know the exact count.
sleep(Duration::from_millis(250)).await;
shutdown
.shutdown_all(Instant::now() + Duration::from_millis(100))
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;

// Read all the events into a `MetricState`, which handles normalizing metrics and tracking
Expand Down Expand Up @@ -579,7 +579,7 @@ mod test {
// everything that was in up without having to know the exact count.
sleep(Duration::from_millis(250)).await;
shutdown
.shutdown_all(Instant::now() + Duration::from_millis(100))
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
}
}
6 changes: 3 additions & 3 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ mod test {

// Shutdown the source, and make sure we've got all the messages we sent in.
shutdown
.shutdown_all(Instant::now() + Duration::from_millis(100))
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;

Expand Down Expand Up @@ -1230,7 +1230,7 @@ mod test {
sleep(Duration::from_secs(1)).await;

shutdown
.shutdown_all(Instant::now() + Duration::from_millis(100))
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;

Expand Down Expand Up @@ -1307,7 +1307,7 @@ mod test {

// Shutdown the source, and make sure we've got all the messages we sent in.
shutdown
.shutdown_all(Instant::now() + Duration::from_millis(100))
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;

Expand Down
66 changes: 38 additions & 28 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct RunningTopology {
abort_tx: mpsc::UnboundedSender<()>,
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
}

impl RunningTopology {
Expand All @@ -54,14 +55,15 @@ impl RunningTopology {
inputs_tap_metadata: HashMap::new(),
outputs: HashMap::new(),
outputs_tap_metadata: HashMap::new(),
config,
shutdown_coordinator: SourceShutdownCoordinator::default(),
detach_triggers: HashMap::new(),
source_tasks: HashMap::new(),
tasks: HashMap::new(),
abort_tx,
watch: watch::channel(TapResource::default()),
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
}
}

Expand Down Expand Up @@ -120,30 +122,36 @@ impl RunningTopology {
check_handles.entry(key).or_default().push(task);
}

// If we reach this, we will forcefully shutdown the sources.
let deadline = Instant::now() + Duration::from_secs(60);

// If we reach the deadline, this future will print out which components
// won't gracefully shutdown since we will start to forcefully shutdown
// the sources.
let mut check_handles2 = check_handles.clone();
let timeout = async move {
sleep_until(deadline).await;
// Remove all tasks that have shutdown.
check_handles2.retain(|_key, handles| {
retain(handles, |handle| handle.peek().is_none());
!handles.is_empty()
});
let remaining_components = check_handles2
.keys()
.map(|item| item.to_string())
.collect::<Vec<_>>()
.join(", ");
// If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
let deadline = self
.graceful_shutdown_duration
.map(|grace_period| Instant::now() + grace_period);

error!(
components = ?remaining_components,
"Failed to gracefully shut down in time. Killing components."
);
let timeout = if let Some(deadline) = deadline {
// If we reach the deadline, this future will print out which components
// won't gracefully shutdown since we will start to forcefully shutdown
// the sources.
let mut check_handles2 = check_handles.clone();
Box::pin(async move {
sleep_until(deadline).await;
// Remove all tasks that have shutdown.
check_handles2.retain(|_key, handles| {
retain(handles, |handle| handle.peek().is_none());
!handles.is_empty()
});
let remaining_components = check_handles2
.keys()
.map(|item| item.to_string())
.collect::<Vec<_>>()
.join(", ");

error!(
components = ?remaining_components,
"Failed to gracefully shut down in time. Killing components."
);
}) as future::BoxFuture<'static, ()>
} else {
Box::pin(future::pending()) as future::BoxFuture<'static, ()>
};

// Reports in intervals which components are still running.
Expand All @@ -163,10 +171,12 @@ impl RunningTopology {
.collect::<Vec<_>>()
.join(", ");

let time_remaining = match deadline.checked_duration_since(Instant::now()) {
Some(remaining) => format!("{} seconds left", remaining.as_secs()),
None => "overdue".to_string(),
};
let time_remaining = deadline
.map(|d| match d.checked_duration_since(Instant::now()) {
Some(remaining) => format!("{} seconds left", remaining.as_secs()),
None => "overdue".to_string(),
})
.unwrap_or("no time limit".to_string());

info!(
remaining_components = ?remaining_components,
Expand Down
Loading

0 comments on commit 23ed0e3

Please sign in to comment.