Skip to content

Commit

Permalink
feat(s3 sink): add file consolidation
Browse files Browse the repository at this point in the history
summary: the existing s3 sink writes many small files
for the customer in order to lessen the memory and disk
footprints of the process. The sheer volume of files can
be overwhelming depending on what the customer users to
process the files. As such, we're adding a process to
run as a separate thread once the sink is started. Upon
configuration, this process will run on a timer and
attempt to create files roughly of the requested size.
As a safety net, the process checks for specific tags
on the files to make sure we're able to concatenate the
data safely.

ref: LOG-18535
  • Loading branch information
dominic-mcallister-logdna committed Dec 18, 2023
1 parent f7ae702 commit 1484d29
Show file tree
Hide file tree
Showing 10 changed files with 1,742 additions and 8 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ heim = { git = "https://github.com/vectordotdev/heim.git", branch = "update-nix"
# make sure to update the external docs when the Lua version changes
mlua = { version = "0.8.9", default-features = false, features = ["lua54", "send", "vendored"], optional = true }

# MEZMO: added dependency for s3-sink file consolidation
gethostname = "0.4.3"

[target.'cfg(windows)'.dependencies]
windows-service = "0.6.0"

Expand Down
61 changes: 60 additions & 1 deletion src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ use crate::{
tls::TlsConfig,
};

// MEZMO: added dependencies for s3-sink file consolidation
use crate::sinks::aws_s3::file_consolidator_async::{
FileConsolidationConfig, FileConsolidatorAsync,
};
use gethostname::gethostname;

/// Configuration for the `aws_s3` sink.
#[configurable_component(sink("aws_s3"))]
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -133,6 +139,11 @@ pub struct S3SinkConfig {
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub acknowledgements: AcknowledgementsConfig,

// MEZMO: added configuration for s3-sink file consolidation
#[configurable(derived)]
#[serde(default)]
pub file_consolidation_config: FileConsolidationConfig,
}

pub(super) fn default_key_prefix() -> String {
Expand Down Expand Up @@ -160,6 +171,7 @@ impl GenerateConfig for S3SinkConfig {
tls: Some(TlsConfig::default()),
auth: AwsAuthentication::default(),
acknowledgements: Default::default(),
file_consolidation_config: Default::default(),
})
.unwrap()
}
Expand Down Expand Up @@ -228,7 +240,15 @@ impl S3SinkConfig {
compression: self.compression,
};

let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
// MEZMO: added new file consolidation process for S3 sinks
let consolidation_process = self.build_consolidation_process(cx.proxy);
let sink = S3Sink::new(
service,
request_options,
partitioner,
batch_settings,
consolidation_process,
);

Ok(VectorSink::from_event_streamsink(sink))
}
Expand All @@ -244,6 +264,45 @@ impl S3SinkConfig {
pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls).await
}

// MEZMO: added process to define setup for s3-sink file consolidation
fn build_consolidation_process(&self, proxy: ProxyConfig) -> Option<FileConsolidatorAsync> {
// we can perform consolidation assuming that the process itself is requested via the configuration
// we only want to handle this process on the primary instance of the statefulset
// so we don't have to worry about contention between instances of sinks
let host_name = gethostname().into_string().unwrap();
if !host_name.ends_with("-0") || !self.file_consolidation_config.enabled {
info!(
message = "S3 sink file consolidation process disabled",
host_name,
config.enabled = self.file_consolidation_config.enabled,
);
return None;
} else {
info!(
message = "S3 sink file consolidation enabled",
host_name,
config.enabled = self.file_consolidation_config.enabled,
);
}

// build the S3 client and config so we can return a new FileConsolidator
let region_or_endpoint = &self.region;
let endpoint = region_or_endpoint.endpoint().unwrap_or_default();
let region = region_or_endpoint.region();

let consolidator = FileConsolidatorAsync::new(
self.auth.clone(),
region.clone(),
endpoint.clone(),
proxy.clone(),
self.tls.clone(),
self.file_consolidation_config,
self.bucket.clone(),
self.key_prefix.clone(),
);
Some(consolidator)
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 1484d29

Please sign in to comment.