Skip to content

Commit

Permalink
chore: Move finalizers and shutdown into vector-core (#13035)
Browse files Browse the repository at this point in the history
* chore: Move ordered finalizer handler to top-level

* chore: Move shutdown into lib/vector-core

* chore: Move finalizers into `lib/vector-core`
  • Loading branch information
bruceg authored Jun 8, 2022
1 parent 465a09b commit bcb2918
Show file tree
Hide file tree
Showing 19 changed files with 84 additions and 97 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ regex = { version = "1.5.6", default-features = false, features = ["std", "perf"
serde = { version = "1.0.137", default-features = false, features = ["derive", "rc"] }
serde_json = { version = "1.0.81", default-features = false }
snafu = { version = "0.7.1", default-features = false }
stream-cancel = { version = "0.8.1", default-features = false }
tokio = { version = "1.19.2", default-features = false }
tokio-stream = { version = "0.1", default-features = false, features = ["time"], optional = true }
tokio-util = { version = "0.7.0", default-features = false, features = ["time"] }
Expand Down
33 changes: 11 additions & 22 deletions src/sources/util/finalizer.rs → lib/vector-core/src/finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,13 @@ use crate::shutdown::ShutdownSignal;
/// event batch identifiers from a source in a single background task
/// *in the order they are received from the source*, using
/// `FinalizerSet`.
#[cfg(any(
feature = "sources-file",
feature = "sources-journald",
feature = "sources-kafka",
))]
pub(crate) type OrderedFinalizer<T> = FinalizerSet<T, FuturesOrdered<FinalizerFuture<T>>>;
pub type OrderedFinalizer<T> = FinalizerSet<T, FuturesOrdered<FinalizerFuture<T>>>;

/// The `UnorderedFinalizer` framework produces a stream of
/// acknowledged event batch identifiers from a source in a single
/// background task *in the order that finalization happens on the
/// event batches*, using `FinalizerSet`.
#[cfg(any(
feature = "sources-aws_sqs",
feature = "sources-splunk_hec",
feature = "sources-gcp_pubsub"
))]
pub(crate) type UnorderedFinalizer<T> = FinalizerSet<T, FuturesUnordered<FinalizerFuture<T>>>;
pub type UnorderedFinalizer<T> = FinalizerSet<T, FuturesUnordered<FinalizerFuture<T>>>;

/// The `FinalizerSet` framework here is a mechanism for creating a
/// stream of acknowledged (finalized) event batch identifiers from a
Expand All @@ -38,7 +28,7 @@ pub(crate) type UnorderedFinalizer<T> = FinalizerSet<T, FuturesUnordered<Finaliz
/// stream of acknowledgements that comes out, extracting just the
/// identifier and sending that into the returned stream. The type `T`
/// is the source-specific data associated with each entry.
pub(crate) struct FinalizerSet<T, S> {
pub struct FinalizerSet<T, S> {
sender: Option<UnboundedSender<(BatchStatusReceiver, T)>>,
_phantom: PhantomData<S>,
}
Expand All @@ -50,7 +40,7 @@ where
{
/// Produce a finalizer set along with the output stream of
/// received acknowledged batch identifiers.
pub(crate) fn new(shutdown: ShutdownSignal) -> (Self, impl Stream<Item = (BatchStatus, T)>) {
pub fn new(shutdown: ShutdownSignal) -> (Self, impl Stream<Item = (BatchStatus, T)>) {
let (todo_tx, todo_rx) = mpsc::unbounded_channel();
(
Self {
Expand All @@ -70,8 +60,7 @@ where
/// stream of acknowledged identifiers. In the case the finalizer
/// is not to be used, a special empty stream is returned that is
/// always pending and so never wakes.
#[cfg(any(feature = "sources-gcp_pubsub", feature = "sources-kafka"))]
pub(crate) fn maybe_new(
pub fn maybe_new(
maybe: bool,
shutdown: ShutdownSignal,
) -> (
Expand All @@ -86,7 +75,7 @@ where
}
}

pub(crate) fn add(&self, entry: T, receiver: BatchStatusReceiver) {
pub fn add(&self, entry: T, receiver: BatchStatusReceiver) {
if let Some(sender) = &self.sender {
if let Err(error) = sender.send((receiver, entry)) {
error!(message = "FinalizerSet task ended prematurely.", %error);
Expand Down Expand Up @@ -115,7 +104,7 @@ where
let this = self.project();
if !*this.is_shutdown {
if this.shutdown.poll_unpin(ctx).is_ready() {
*this.is_shutdown = true
*this.is_shutdown = true;
}
// Only poll for new entries until shutdown is flagged.
// Loop over all the ready new entries at once.
Expand Down Expand Up @@ -153,7 +142,7 @@ where
}
}

pub(crate) trait FuturesSet<Fut: Future>: Stream<Item = Fut::Output> {
pub trait FuturesSet<Fut: Future>: Stream<Item = Fut::Output> {
fn is_empty(&self) -> bool;
fn push(&mut self, future: Fut);
}
Expand All @@ -164,7 +153,7 @@ impl<Fut: Future> FuturesSet<Fut> for FuturesOrdered<Fut> {
}

fn push(&mut self, future: Fut) {
Self::push(self, future)
Self::push(self, future);
}
}

Expand All @@ -174,12 +163,12 @@ impl<Fut: Future> FuturesSet<Fut> for FuturesUnordered<Fut> {
}

fn push(&mut self, future: Fut) {
Self::push(self, future)
Self::push(self, future);
}
}

#[pin_project::pin_project]
pub(crate) struct FinalizerFuture<T> {
pub struct FinalizerFuture<T> {
receiver: BatchStatusReceiver,
entry: Option<T>,
}
Expand Down
14 changes: 9 additions & 5 deletions lib/vector-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,24 @@
pub mod config;
pub mod event;
pub mod fanout;
pub mod finalizer;
pub mod metrics;
pub mod partition;
pub mod schema;
pub mod serde;
pub mod shutdown;
pub mod sink;
pub mod source;
pub mod stream;
#[cfg(test)]
mod test_util;
pub mod transform;
pub use vector_buffers as buffers;
pub mod partition;
pub mod serde;
pub mod stream;
pub mod time;
pub mod transform;
pub mod trigger;

use std::path::PathBuf;

pub use vector_buffers as buffers;
#[cfg(any(test, feature = "test"))]
pub use vector_common::event_test_util;
pub use vector_common::{byte_size_of::ByteSizeOf, internal_event};
Expand Down
74 changes: 47 additions & 27 deletions src/shutdown.rs → lib/vector-core/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,22 @@ use futures::{future, ready, FutureExt};
use stream_cancel::{Trigger, Tripwire};
use tokio::time::{timeout_at, Instant};

use crate::{config::ComponentKey, stream::tripwire_handler, trigger::DisabledTrigger};
use crate::{config::ComponentKey, trigger::DisabledTrigger};

pub async fn tripwire_handler(closed: bool) {
futures::future::poll_fn(|_| {
if closed {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;
}

/// When this struct goes out of scope and its internal refcount goes to 0 it is a signal that its
/// corresponding Source has completed executing and may be cleaned up. It is the responsibility
/// of each Source to ensure that at least one copy of this handle remains alive for the entire
/// corresponding `Source` has completed executing and may be cleaned up. It is the responsibility
/// of each `Source` to ensure that at least one copy of this handle remains alive for the entire
/// lifetime of the Source.
#[derive(Clone, Debug)]
pub struct ShutdownSignalToken {
Expand All @@ -29,7 +40,7 @@ impl ShutdownSignalToken {
}
}

/// Passed to each Source to coordinate the global shutdown process.
/// Passed to each `Source` to coordinate the global shutdown process.
#[pin_project::pin_project]
#[derive(Clone, Debug)]
pub struct ShutdownSignal {
Expand Down Expand Up @@ -72,7 +83,6 @@ impl ShutdownSignal {
}
}

#[cfg(test)]
pub fn noop() -> Self {
let (trigger, tripwire) = Tripwire::new();
Self {
Expand All @@ -81,7 +91,6 @@ impl ShutdownSignal {
}
}

#[cfg(test)]
pub fn new_wired() -> (Trigger, ShutdownSignal, Tripwire) {
let (trigger_shutdown, tripwire) = Tripwire::new();
let (trigger, shutdown_done) = Tripwire::new();
Expand All @@ -100,7 +109,7 @@ pub struct SourceShutdownCoordinator {

impl SourceShutdownCoordinator {
/// Creates the necessary Triggers and Tripwires for coordinating shutdown of this Source and
/// stores them as needed. Returns the ShutdownSignal for this Source as well as a Tripwire
/// stores them as needed. Returns the `ShutdownSignal` for this Source as well as a Tripwire
/// that will be notified if the Source should be forcibly shut down.
pub fn register_source(
&mut self,
Expand All @@ -126,7 +135,11 @@ impl SourceShutdownCoordinator {
(shutdown_signal, force_shutdown_tripwire)
}

/// Takes ownership of all internal state for the given source from another ShutdownCoordinator.
/// Takes ownership of all internal state for the given source from another `ShutdownCoordinator`.
///
/// # Panics
///
/// Panics if the other coordinator already had its triggers removed.
pub fn takeover_source(&mut self, id: &ComponentKey, other: &mut Self) {
let existing = self.shutdown_begun_triggers.insert(
id.clone(),
Expand All @@ -137,12 +150,11 @@ impl SourceShutdownCoordinator {
)
}),
);
if existing.is_some() {
panic!(
"ShutdownCoordinator already has a shutdown_begin_trigger for source \"{}\"",
id
);
}
assert!(
!existing.is_some(),
"ShutdownCoordinator already has a shutdown_begin_trigger for source \"{}\"",
id
);

let existing = self.shutdown_force_triggers.insert(
id.clone(),
Expand All @@ -153,12 +165,11 @@ impl SourceShutdownCoordinator {
)
}),
);
if existing.is_some() {
panic!(
"ShutdownCoordinator already has a shutdown_force_trigger for source \"{}\"",
id
);
}
assert!(
!existing.is_some(),
"ShutdownCoordinator already has a shutdown_force_trigger for source \"{}\"",
id
);

let existing = self.shutdown_complete_tripwires.insert(
id.clone(),
Expand All @@ -172,18 +183,22 @@ impl SourceShutdownCoordinator {
)
}),
);
if existing.is_some() {
panic!(
"ShutdownCoordinator already has a shutdown_complete_tripwire for source \"{}\"",
id
);
}
assert!(
!existing.is_some(),
"ShutdownCoordinator already has a shutdown_complete_tripwire for source \"{}\"",
id
);
}

/// Sends a signal to begin shutting down to all sources, and returns a future that
/// resolves once all sources have either shut down completely, or have been sent the
/// force shutdown signal. The force shutdown signal will be sent to any sources that
/// don't cleanly shut down before the given `deadline`.
///
/// # Panics
///
/// Panics if this coordinator has had its triggers removed (ie
/// has been taken over with `Self::takeover_source`).
pub fn shutdown_all(self, deadline: Instant) -> impl Future<Output = ()> {
let mut complete_futures = Vec::new();

Expand Down Expand Up @@ -226,6 +241,11 @@ impl SourceShutdownCoordinator {
/// The returned future resolves to a bool that indicates if the source shut down cleanly before
/// the given `deadline`. If the result is false then that means the source failed to shut down
/// before `deadline` and had to be force-shutdown.
///
/// # Panics
///
/// Panics if this coordinator has had its triggers removed (ie
/// has been taken over with `Self::takeover_source`).
pub fn shutdown_source(
&mut self,
id: &ComponentKey,
Expand Down Expand Up @@ -336,7 +356,7 @@ mod test {
let deadline = Instant::now() + Duration::from_secs(1);
let shutdown_complete = shutdown.shutdown_source(&id, deadline);

// Since we never drop the ShutdownSignal the ShutdownCoordinator assumes the Source is
// Since we never drop the `ShutdownSignal` the `ShutdownCoordinator` assumes the Source is
// still running and must force shutdown.
let success = shutdown_complete.await;
assert!(!success);
Expand Down
2 changes: 1 addition & 1 deletion src/trigger.rs → lib/vector-core/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ impl DisabledTrigger {
}

pub fn into_inner(mut self) -> Trigger {
self.trigger.take().unwrap()
self.trigger.take().unwrap_or_else(|| unreachable!())
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ pub mod providers;
pub mod serde;
#[cfg(windows)]
pub mod service;
pub mod shutdown;
pub mod signal;
pub(crate) mod sink;
#[allow(unreachable_pub)]
Expand All @@ -78,7 +77,6 @@ pub mod source_sender;
#[allow(unreachable_pub)]
pub mod sources;
pub mod stats;
pub(crate) mod stream;
#[cfg(feature = "api-client")]
#[allow(unreachable_pub)]
mod tap;
Expand All @@ -95,7 +93,6 @@ pub mod topology;
pub mod trace;
#[allow(unreachable_pub)]
pub mod transforms;
pub mod trigger;
pub mod types;
pub mod udp;
pub mod unit_test;
Expand All @@ -105,7 +102,7 @@ pub mod validate;
pub mod vector_windows;

pub use source_sender::SourceSender;
pub use vector_core::{event, metrics, schema, Error, Result};
pub use vector_core::{event, metrics, schema, shutdown, Error, Result};

pub fn vector_version() -> impl std::fmt::Display {
#[cfg(feature = "nightly")]
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/prometheus/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ impl PrometheusExporter {
#[allow(clippy::print_stderr)]
Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
.serve(new_service)
.with_graceful_shutdown(tripwire.then(crate::stream::tripwire_handler))
.with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler))
.instrument(span)
.await
.map_err(|error| eprintln!("Server error: {}", error))?;
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/util/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ where
let (trigger, tripwire) = Tripwire::new();
let server = Server::bind(&addr)
.serve(service)
.with_graceful_shutdown(tripwire.then(crate::stream::tripwire_handler))
.with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler))
.map_err(|error| panic!("Server error: {}", error));

(rx, trigger, server)
Expand Down
3 changes: 2 additions & 1 deletion src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use aws_sdk_sqs::{
use chrono::{DateTime, TimeZone, Utc};
use futures::{FutureExt, StreamExt};
use tokio::{pin, select, time::Duration};
use vector_core::finalizer::UnorderedFinalizer;

use crate::{
codecs::Decoder,
event::{BatchNotifier, BatchStatus},
internal_events::{EndpointBytesReceived, SqsMessageDeleteError, StreamClosedError},
shutdown::ShutdownSignal,
sources::util::{self, finalizer::UnorderedFinalizer},
sources::util,
SourceSender,
};

Expand Down
3 changes: 2 additions & 1 deletion src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use tokio::{sync::oneshot, task::spawn_blocking};
use tracing::{Instrument, Span};
use vector_core::finalizer::OrderedFinalizer;

use super::util::{finalizer::OrderedFinalizer, EncodingConfig, MultilineConfig};
use super::util::{EncodingConfig, MultilineConfig};
use crate::{
config::{
log_schema, AcknowledgementsConfig, DataType, Output, SourceConfig, SourceContext,
Expand Down
Loading

0 comments on commit bcb2918

Please sign in to comment.