Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(configurable shutdown duration): make shutdown duration configurable #17479

Merged
merged 14 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions lib/vector-common/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl SourceShutdownCoordinator {
///
/// Panics if this coordinator has had its triggers removed (ie
/// has been taken over with `Self::takeover_source`).
pub fn shutdown_all(self, deadline: Instant) -> impl Future<Output = ()> {
pub fn shutdown_all(self, deadline: Option<Instant>) -> impl Future<Output = ()> {
let mut complete_futures = Vec::new();

let shutdown_begun_triggers = self.shutdown_begun_triggers;
Expand Down Expand Up @@ -275,7 +275,7 @@ impl SourceShutdownCoordinator {
shutdown_complete_tripwire,
shutdown_force_trigger,
id.clone(),
deadline,
Some(deadline),
)
}

Expand All @@ -297,23 +297,27 @@ impl SourceShutdownCoordinator {
shutdown_complete_tripwire: Tripwire,
shutdown_force_trigger: Trigger,
id: ComponentKey,
deadline: Instant,
deadline: Option<Instant>,
) -> impl Future<Output = bool> {
async move {
// Call `shutdown_force_trigger.disable()` on drop.
let shutdown_force_trigger = DisabledTrigger::new(shutdown_force_trigger);

let fut = shutdown_complete_tripwire.then(tripwire_handler);
if timeout_at(deadline, fut).await.is_ok() {
shutdown_force_trigger.into_inner().disable();
true
if let Some(deadline) = deadline {
// Call `shutdown_force_trigger.disable()` on drop.
let shutdown_force_trigger = DisabledTrigger::new(shutdown_force_trigger);
if timeout_at(deadline, fut).await.is_ok() {
shutdown_force_trigger.into_inner().disable();
true
} else {
error!(
"Source '{}' failed to shutdown before deadline. Forcing shutdown.",
id,
);
shutdown_force_trigger.into_inner().cancel();
false
}
} else {
error!(
"Source '{}' failed to shutdown before deadline. Forcing shutdown.",
id,
);
shutdown_force_trigger.into_inner().cancel();
false
fut.await;
true
}
}
.boxed()
Expand Down
13 changes: 12 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(missing_docs)]
use std::{collections::HashMap, num::NonZeroUsize, path::PathBuf};
use std::{collections::HashMap, num::NonZeroUsize, path::PathBuf, time::Duration};

use exitcode::ExitCode;
use futures::StreamExt;
Expand Down Expand Up @@ -62,10 +62,19 @@ impl ApplicationConfig {
) -> Result<Self, ExitCode> {
let config_paths = opts.config_paths_with_formats();

let graceful_shutdown_duration = if opts.no_graceful_shutdown_limit {
None
} else {
Some(Duration::from_secs(u64::from(
opts.graceful_shutdown_limit_secs,
)))
};
DominicBurkart marked this conversation as resolved.
Show resolved Hide resolved

let config = load_configs(
&config_paths,
opts.watch_config,
opts.require_healthy,
graceful_shutdown_duration,
signal_handler,
)
.await?;
Expand Down Expand Up @@ -410,6 +419,7 @@ pub async fn load_configs(
config_paths: &[ConfigPath],
watch_config: bool,
require_healthy: Option<bool>,
graceful_shutdown_duration: Option<Duration>,
signal_handler: &mut SignalHandler,
) -> Result<Config, ExitCode> {
let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
Expand Down Expand Up @@ -440,6 +450,7 @@ pub async fn load_configs(
info!("Health checks are disabled.");
}
config.healthchecks.set_require_healthy(require_healthy);
config.graceful_shutdown_duration = graceful_shutdown_duration;

Ok(config)
}
Expand Down
23 changes: 22 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(missing_docs)]
use std::path::PathBuf;
use std::{num::NonZeroU64, path::PathBuf};

use clap::{ArgAction, CommandFactory, FromArgMatches, Parser};

Expand Down Expand Up @@ -159,6 +159,27 @@ pub struct RootOpts {
)]
pub internal_log_rate_limit: u64,

/// Set the duration in seconds to wait for graceful shutdown after SIGINT or SIGTERM are received.
/// After the duration has passed, Vector will force shutdown.
#[arg(
long,
default_value = "60",
env = "VECTOR_GRACEFUL_SHUTDOWN_LIMIT_SECS",
group = "graceful-shutdown-limit"
)]
pub graceful_shutdown_limit_secs: NonZeroU64,

/// Never time out while waiting for graceful shutdown after SIGINT or SIGTERM received. This is useful
/// when you would like for Vector to attempt to send data until terminated by a SIGKILL. Overrides/cannot
/// be set with --graceful-shutdown-limit-secs.
#[arg(
long,
default_value = "false",
env = "VECTOR_NO_GRACEFUL_SHUTDOWN_LIMIT",
group = "graceful-shutdown-limit"
)]
pub no_graceful_shutdown_limit: bool,

/// Set runtime allocation tracing
#[cfg(feature = "allocation-tracing")]
#[arg(long, env = "ALLOCATION_TRACING", default_value = "false")]
Expand Down
11 changes: 10 additions & 1 deletion src/config/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(feature = "enterprise")]
use std::collections::BTreeMap;
use std::path::Path;
use std::{path::Path, time::Duration};

use indexmap::IndexMap;
#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -78,6 +78,13 @@ pub struct ConfigBuilder {
/// All configured secrets backends.
#[serde(default)]
pub secret: IndexMap<ComponentKey, SecretBackends>,

/// The duration in seconds to wait for graceful shutdown after SIGINT or SIGTERM are received.
/// After the duration has passed, Vector will force shutdown. Default value is 60 seconds. This
/// value can be set using a [cli arg](crate::cli::RootOpts::graceful_shutdown_duration).
#[serde(default, skip)]
#[doc(hidden)]
pub graceful_shutdown_duration: Option<Duration>,
}

#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -195,6 +202,7 @@ impl From<Config> for ConfigBuilder {
transforms,
tests,
secret,
graceful_shutdown_duration,
hash: _,
} = config;

Expand Down Expand Up @@ -225,6 +233,7 @@ impl From<Config> for ConfigBuilder {
provider: None,
tests,
secret,
graceful_shutdown_duration,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/config/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
tests,
provider: _,
secret,
graceful_shutdown_duration,
} = builder;

let graph = match Graph::new(&sources, &transforms, &sinks, schema) {
Expand Down Expand Up @@ -111,6 +112,7 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
transforms,
tests,
secret,
graceful_shutdown_duration,
};

config.propagate_acknowledgements()?;
Expand Down
2 changes: 2 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
hash::Hash,
net::SocketAddr,
path::PathBuf,
time::Duration,
};

use indexmap::IndexMap;
Expand Down Expand Up @@ -103,6 +104,7 @@ pub struct Config {
pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter>,
tests: Vec<TestDefinition>,
secret: IndexMap<ComponentKey, SecretBackends>,
pub graceful_shutdown_duration: Option<Duration>,
}

impl Config {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ mod tests {

sleep(Duration::from_millis(100)).await;
shutdown
.shutdown_all(Instant::now() + Duration::from_secs(1))
.shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
.await;

timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
Expand Down
4 changes: 2 additions & 2 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ mod test {
// everything that was in up without having to know the exact count.
sleep(Duration::from_millis(250)).await;
shutdown
.shutdown_all(Instant::now() + Duration::from_millis(100))
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;

// Read all the events into a `MetricState`, which handles normalizing metrics and tracking
Expand Down Expand Up @@ -578,7 +578,7 @@ mod test {
// everything that was in up without having to know the exact count.
sleep(Duration::from_millis(250)).await;
shutdown
.shutdown_all(Instant::now() + Duration::from_millis(100))
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
}
}
6 changes: 3 additions & 3 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ mod test {

// Shutdown the source, and make sure we've got all the messages we sent in.
shutdown
.shutdown_all(Instant::now() + Duration::from_millis(100))
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;

Expand Down Expand Up @@ -1229,7 +1229,7 @@ mod test {
sleep(Duration::from_secs(1)).await;

shutdown
.shutdown_all(Instant::now() + Duration::from_millis(100))
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;

Expand Down Expand Up @@ -1306,7 +1306,7 @@ mod test {

// Shutdown the source, and make sure we've got all the messages we sent in.
shutdown
.shutdown_all(Instant::now() + Duration::from_millis(100))
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;

Expand Down
66 changes: 38 additions & 28 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct RunningTopology {
abort_tx: mpsc::UnboundedSender<()>,
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
}

impl RunningTopology {
Expand All @@ -54,14 +55,15 @@ impl RunningTopology {
inputs_tap_metadata: HashMap::new(),
outputs: HashMap::new(),
outputs_tap_metadata: HashMap::new(),
config,
shutdown_coordinator: SourceShutdownCoordinator::default(),
detach_triggers: HashMap::new(),
source_tasks: HashMap::new(),
tasks: HashMap::new(),
abort_tx,
watch: watch::channel(TapResource::default()),
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
}
}

Expand Down Expand Up @@ -120,30 +122,36 @@ impl RunningTopology {
check_handles.entry(key).or_default().push(task);
}

// If we reach this, we will forcefully shutdown the sources.
let deadline = Instant::now() + Duration::from_secs(60);

// If we reach the deadline, this future will print out which components
// won't gracefully shutdown since we will start to forcefully shutdown
// the sources.
let mut check_handles2 = check_handles.clone();
let timeout = async move {
sleep_until(deadline).await;
// Remove all tasks that have shutdown.
check_handles2.retain(|_key, handles| {
retain(handles, |handle| handle.peek().is_none());
!handles.is_empty()
});
let remaining_components = check_handles2
.keys()
.map(|item| item.to_string())
.collect::<Vec<_>>()
.join(", ");
// If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
let deadline = self
.graceful_shutdown_duration
.map(|grace_period| Instant::now() + grace_period);

error!(
components = ?remaining_components,
"Failed to gracefully shut down in time. Killing components."
);
let timeout = if let Some(deadline) = deadline {
// If we reach the deadline, this future will print out which components
// won't gracefully shutdown since we will start to forcefully shutdown
// the sources.
let mut check_handles2 = check_handles.clone();
Box::pin(async move {
sleep_until(deadline).await;
// Remove all tasks that have shutdown.
check_handles2.retain(|_key, handles| {
retain(handles, |handle| handle.peek().is_none());
!handles.is_empty()
});
let remaining_components = check_handles2
.keys()
.map(|item| item.to_string())
.collect::<Vec<_>>()
.join(", ");

error!(
components = ?remaining_components,
"Failed to gracefully shut down in time. Killing components."
);
}) as future::BoxFuture<'static, ()>
} else {
Box::pin(future::pending()) as future::BoxFuture<'static, ()>
};

// Reports in intervals which components are still running.
Expand All @@ -163,10 +171,12 @@ impl RunningTopology {
.collect::<Vec<_>>()
.join(", ");

let time_remaining = match deadline.checked_duration_since(Instant::now()) {
Some(remaining) => format!("{} seconds left", remaining.as_secs()),
None => "overdue".to_string(),
};
let time_remaining = deadline
.map(|d| match d.checked_duration_since(Instant::now()) {
Some(remaining) => format!("{} seconds left", remaining.as_secs()),
None => "overdue".to_string(),
})
.unwrap_or("no time limit".to_string());

info!(
remaining_components = ?remaining_components,
Expand Down