diff --git a/Cargo.lock b/Cargo.lock index aa64e84c3ca83..71b8f51cb47ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3614,6 +3614,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "gethostname" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0176e0459c2e4a1fe232f984bca6890e681076abb9934f6cea7c326f3fc47818" +dependencies = [ + "libc", + "windows-targets 0.48.0", +] + [[package]] name = "getrandom" version = "0.1.16" @@ -9715,6 +9725,7 @@ dependencies = [ "flate2", "futures 0.3.28", "futures-util", + "gethostname", "glob", "goauth", "governor", diff --git a/Cargo.toml b/Cargo.toml index b871f0e5fc058..1e548b3e5949a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -335,6 +335,9 @@ heim = { git = "https://github.com/vectordotdev/heim.git", branch = "update-nix" # make sure to update the external docs when the Lua version changes mlua = { version = "0.8.9", default-features = false, features = ["lua54", "send", "vendored"], optional = true } +# MEZMO: added dependency for s3-sink file consolidation +gethostname = "0.4.3" + [target.'cfg(windows)'.dependencies] windows-service = "0.6.0" diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index b4315a2847911..0c5c124fc6b37 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -33,6 +33,12 @@ use crate::{ tls::TlsConfig, }; +// MEZMO: added dependencies for s3-sink file consolidation +use crate::sinks::aws_s3::file_consolidator_async::{ + FileConsolidationConfig, FileConsolidatorAsync, +}; +use gethostname::gethostname; + /// Configuration for the `aws_s3` sink. #[configurable_component(sink("aws_s3"))] #[derive(Clone, Debug)] @@ -133,6 +139,11 @@ pub struct S3SinkConfig { skip_serializing_if = "crate::serde::skip_serializing_if_default" )] pub acknowledgements: AcknowledgementsConfig, + + // MEZMO: added configuration for s3-sink file consolidation + #[configurable(derived)] + #[serde(default)] + pub file_consolidation_config: FileConsolidationConfig, } pub(super) fn default_key_prefix() -> String { @@ -160,6 +171,7 @@ impl GenerateConfig for S3SinkConfig { tls: Some(TlsConfig::default()), auth: AwsAuthentication::default(), acknowledgements: Default::default(), + file_consolidation_config: Default::default(), }) .unwrap() } @@ -228,7 +240,15 @@ impl S3SinkConfig { compression: self.compression, }; - let sink = S3Sink::new(service, request_options, partitioner, batch_settings); + // MEZMO: added new file consolidation process for S3 sinks + let consolidation_process = self.build_consolidation_process(cx.proxy); + let sink = S3Sink::new( + service, + request_options, + partitioner, + batch_settings, + consolidation_process, + ); Ok(VectorSink::from_event_streamsink(sink)) } @@ -244,6 +264,45 @@ impl S3SinkConfig { pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result { s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls).await } + + // MEZMO: added process to define setup for s3-sink file consolidation + fn build_consolidation_process(&self, proxy: ProxyConfig) -> Option { + // we can perform consolidation assuming that the process itself is requested via the configuration + // we only want to handle this process on the primary instance of the statefulset + // so we don't have to worry about contention between instances of sinks + let host_name = gethostname().into_string().unwrap(); + if !host_name.ends_with("-0") || !self.file_consolidation_config.enabled { + info!( + message = "S3 sink file consolidation process disabled", + host_name, + config.enabled = self.file_consolidation_config.enabled, + ); + return None; + } else { + info!( + message = "S3 sink file consolidation enabled", + host_name, + config.enabled = self.file_consolidation_config.enabled, + ); + } + + // build the S3 client and config so we can return a new FileConsolidator + let region_or_endpoint = &self.region; + let endpoint = region_or_endpoint.endpoint().unwrap_or_default(); + let region = region_or_endpoint.region(); + + let consolidator = FileConsolidatorAsync::new( + self.auth.clone(), + region.clone(), + endpoint.clone(), + proxy.clone(), + self.tls.clone(), + self.file_consolidation_config, + self.bucket.clone(), + self.key_prefix.clone(), + ); + Some(consolidator) + } } #[cfg(test)] diff --git a/src/sinks/aws_s3/file_consolidation_processor.rs b/src/sinks/aws_s3/file_consolidation_processor.rs new file mode 100644 index 0000000000000..0f9e38e2cef7b --- /dev/null +++ b/src/sinks/aws_s3/file_consolidation_processor.rs @@ -0,0 +1,865 @@ +use bytes::{Bytes, BytesMut}; +use std::{ + io::Read, + time::{SystemTime, UNIX_EPOCH}, +}; + +use flate2::read::GzDecoder; +use std::io; +use std::io::Cursor; + +use aws_sdk_s3::{ + model::{CompletedMultipartUpload, CompletedPart}, + types::ByteStream, + Client as S3Client, Error, +}; + +use aws_smithy_types::DateTime as AwsDateTime; +use base64::prelude::{Engine as _, BASE64_STANDARD}; +use md5::Digest; + +const TWO_HOURS_IN_SECONDS: u64 = 2 * 60 * 60; +const MULTIPART_FILE_MIN_MB_I32: i32 = 5 * 1024 * 1024; +const ONE_MEGABYTE_USIZE: usize = 1024 * 1024; + +// handles consolidating the small files within AWS into much larger files +#[derive(Debug)] +pub struct FileConsolidationProcessor<'a> { + s3_client: &'a S3Client, + bucket: String, + key_prefix: String, + requested_size_bytes: i32, +} + +// handles consolidating all the smaller files generated into larger files of the bucket +// a few assumptions: +// 1. the files are tagged with mezmo keys as found in @get_files_to_consolidate +// to keep us from accidentally messing with files the customer had in the bucket +// 2. the files themselves aren't huge (currently the sink limits to 10 MB files) +// so no file size containts are enforced locally for memory issues across the instance +impl<'a> FileConsolidationProcessor<'a> { + pub const fn new( + s3_client: &'a S3Client, + bucket: String, + key_prefix: String, + requested_size_bytes: i32, + ) -> Self { + FileConsolidationProcessor { + s3_client, + bucket, + key_prefix, + requested_size_bytes, + } + } + + pub async fn run(self) { + // retrieve the files list from s3 that we can process + let mut files_to_consolidate: Vec = match get_files_to_consolidate( + self.s3_client, + self.bucket.clone(), + self.key_prefix.clone(), + ) + .await + { + Ok(f) => f, + Err(e) => { + error!( + ?e, + "bucket={}, prefix={}, Failed to retrieve files to consolidate", + self.bucket.clone(), + self.key_prefix.clone(), + ); + let empty_files: Vec = Vec::new(); + empty_files + } + }; + + // if we have no files to combine + if files_to_consolidate.len() <= 1 { + return; + } + + // break the files into groups so we can generate a file of the + // size requested by the customer + while !files_to_consolidate.is_empty() { + // keep track of the processed files to delete + let mut files_to_delete: Vec = Vec::new(); + let mut completed_parts: Vec = Vec::new(); + + let upload_file_parts: Vec = + splice_files_list(self.requested_size_bytes, &mut files_to_consolidate); + + // build the new file properties and expiration time + // make sure the process hasn't ran so fast (really should just unit tests) + // that we accidentally overwrite a merge file. + let mut time_since_epoch: u64; + let mut new_file_key: String; + + loop { + time_since_epoch = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + new_file_key = + format!("{}{}_merged.log", self.key_prefix.clone(), time_since_epoch); + + match self + .s3_client + .get_object() + .bucket(self.bucket.clone()) + .key(new_file_key.clone()) + .send() + .await + { + Ok(_data) => { + info!( + "bucket={}, Merged file already exists, file={}", + self.bucket.clone(), + new_file_key.clone(), + ); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + Err(_e) => { + // the file doesn't exist, break the loop and move on. + break; + } + }; + } + info!( + "bucket={}, Starting consolidated file={}", + self.bucket.clone(), + new_file_key.clone(), + ); + + let tags = { + let mut tagging = url::form_urlencoded::Serializer::new(String::new()); + tagging.append_pair("mezmo_pipeline_merged", "true"); + tagging.finish() + }; + + let content_type = "text/x-log".to_owned(); + + //calculate the size of all the files + //we make no assumptions about compression here if the file is gzip'd + let mut total_bytes_of_all_files: i32 = 0; + for record in upload_file_parts.iter() { + total_bytes_of_all_files += record.size; + } + + // there is a mimimum size of a multipart upload so we'll just upload a single file + // if the directory has less than that amount. + if total_bytes_of_all_files <= MULTIPART_FILE_MIN_MB_I32 { + let bytes = match download_all_files_as_bytes( + self.s3_client, + self.bucket.clone(), + &upload_file_parts, + &mut files_to_delete, + ) + .await + { + Ok(data) => data, + Err(e) => { + error!( + ?e, + "bucket={}, Failed to download files", + self.bucket.clone(), + ); + continue; + } + }; + + if bytes.is_empty() { + info!( + "bucket={}, Failed to download files={:?}", + self.bucket.clone(), + upload_file_parts, + ); + continue; + } + + let content_md5 = BASE64_STANDARD.encode(md5::Md5::digest(bytes.clone())); + match self + .s3_client + .put_object() + .body(bytes_to_bytestream(bytes)) + .bucket(self.bucket.clone()) + .key(new_file_key.clone()) + .set_content_type(Some(content_type)) + .set_tagging(Some(tags)) + .content_md5(content_md5) + .send() + .await + { + Ok(f) => { + info!( + "bucket={}, Successfully put single consolidated file={} for files={:?}", + self.bucket.clone(), + new_file_key.clone(), + upload_file_parts + ); + f + } + Err(e) => { + error!( + ?e, + "bucket={}, Failed to put single consolidated file={}", + self.bucket.clone(), + new_file_key.clone(), + ); + continue; + } + }; + } else { + // use a multi-part upload for all the files that we need to process + // set an expiration time for the file in case things go awry (vector dies, sink reloads, etc) + // with the expiration time, the file will be auto-deleted if necessary. + // we'll build the files up to the maximum size requested + let expires = time_since_epoch + (TWO_HOURS_IN_SECONDS); + let aws_time = AwsDateTime::from_secs(expires as i64); + let multi_part_upload = match self + .s3_client + .create_multipart_upload() + .bucket(self.bucket.clone()) + .key(new_file_key.clone()) + .set_content_type(Some(content_type)) + .set_tagging(Some(tags)) + .set_expires(Some(aws_time)) + .send() + .await + { + Ok(m) => { + info!( + "bucket={}, Successfully created multipart doc for file={}", + self.bucket.clone(), + new_file_key.clone(), + ); + m + } + Err(e) => { + error!( + ?e, + "bucket={}, Failed to invoke multipart upload file={}", + self.bucket.clone(), + new_file_key.clone(), + ); + continue; + } + }; + + let upload_id: String = multi_part_upload.upload_id().unwrap().to_owned(); + + // The minimum file size for upload parts and copy parts 5 MB, so we'll manually consolidate + // small files into one larger to fill the void. Using 5 MB files will allow us to achieve + // 50 GB total over 10_000 parts + let mut part_num: i32 = 0; + let mut buf = BytesMut::with_capacity(0); + let mut buf_files: Vec = Vec::new(); + + for file in &upload_file_parts { + // if we've got a plaintext file that meets the multipart min, then we can straight copy it via sdk + if !file.compressed && file.size >= MULTIPART_FILE_MIN_MB_I32 { + part_num += 1; + + let source = format!("{}/{}", self.bucket.clone(), file.key.clone()); + let encoded_source = urlencoding::encode(&source); + + let copy = match self + .s3_client + .upload_part_copy() + .bucket(self.bucket.clone()) + .key(new_file_key.clone()) + .upload_id(upload_id.clone()) + .copy_source(encoded_source) + .part_number(part_num) + .send() + .await + { + Ok(c) => { + info!( + "bucket={}, upload_id={}, Copied part={} ({}) for file={}", + self.bucket.clone(), + upload_id.clone(), + part_num, + file.key.clone(), + new_file_key.clone() + ); + c + } + Err(e) => { + error!( + ?e, + "bucket={}, upload_id={}, Failed to put copy part file={}", + self.bucket.clone(), + upload_id.clone(), + file.key.clone(), + ); + part_num -= 1; + continue; + } + }; + + // keep track of the part for completion and deletion + completed_parts.push( + CompletedPart::builder() + .e_tag(copy.copy_part_result().unwrap().e_tag().unwrap()) + .part_number(part_num) + .build(), + ); + + files_to_delete.push(file.key.clone()); + continue; + } + + // if the file is compressed, we need to pull and decompress it + // so we can join it with other files + // if its less than 5 megs, we need to pull it too to consolidate with other files + let vector = + match download_file_as_vec(self.s3_client, self.bucket.clone(), file).await + { + Ok(v) => { + info!("bucket={}, Downloaded file={:?}", self.bucket.clone(), file); + v + } + Err(e) => { + error!( + ?e, + "bucket={}, Failed to download file={}", + self.bucket.clone(), + file.key.clone(), + ); + continue; + } + }; + + buf.extend_from_slice(&vector); + buf.extend_from_slice(b"\n"); //newline between file + buf_files.push(file.key.clone()); + + // if we've got the minimum for a multipart chunk, send it on to the server + if buf.len() as i32 >= MULTIPART_FILE_MIN_MB_I32 { + part_num += 1; + + // cloning the buffer so its not moved + let body = bytes_to_bytestream(Bytes::from(buf.clone())); + let upload = match self + .s3_client + .upload_part() + .bucket(self.bucket.clone()) + .key(new_file_key.clone()) + .upload_id(upload_id.clone()) + .part_number(part_num) + .body(body) + .send() + .await + { + Ok(u) => { + info!( + "bucket={}, upload_id={}, Uploaded part={} for file={}", + self.bucket.clone(), + upload_id.clone(), + part_num, + new_file_key.clone(), + ); + u + } + Err(e) => { + error!( + ?e, + "bucket={}, upload_id={}, Failed to upload new part={} for file={}", + self.bucket.clone(), + upload_id.clone(), + part_num, + new_file_key.clone(), + ); + part_num -= 1; + continue; + } + }; + + // keep track of the part for completion + completed_parts.push( + CompletedPart::builder() + .e_tag(upload.e_tag().unwrap()) + .part_number(part_num) + .build(), + ); + + for file in &buf_files { + files_to_delete.push(file.clone()) + } + + // reset the buffer by clearing the memory + buf = BytesMut::with_capacity(0); + buf_files.clear(); + } + } + + // there's still data in the buffer, so make that the final part. + // the final upload part doesn't have to be the 5 MB min + if !buf.is_empty() { + part_num += 1; + + let upload = match self + .s3_client + .upload_part() + .bucket(self.bucket.clone()) + .key(new_file_key.clone()) + .upload_id(upload_id.clone()) + .part_number(part_num) + .body(bytes_to_bytestream(Bytes::from(buf))) + .send() + .await + { + Ok(u) => { + info!( + "bucket={}, upload_id={}, Uploaded part={} for file={}", + self.bucket.clone(), + upload_id.clone(), + part_num, + new_file_key.clone(), + ); + u + } + Err(e) => { + error!( + ?e, + "bucket={}, upload_id={}, Failed to upload new part={} for file={}", + self.bucket.clone(), + upload_id.clone(), + part_num, + new_file_key.clone() + ); + continue; + } + }; + + // keep track of the part for completion + completed_parts.push( + CompletedPart::builder() + .e_tag(upload.e_tag().unwrap()) + .part_number(part_num) + .build(), + ); + + for file in &buf_files { + files_to_delete.push(file.clone()) + } + } + + // time to mark the entire file as complete + match self + .s3_client + .complete_multipart_upload() + .bucket(self.bucket.clone()) + .key(new_file_key.clone()) + .upload_id(upload_id.clone()) + .multipart_upload( + CompletedMultipartUpload::builder() + .set_parts(Some(completed_parts)) + .build(), + ) + .send() + .await + { + Ok(u) => { + info!( + "bucket={}, upload_id={}, Completed multipart upload for file={}", + self.bucket.clone(), + upload_id.clone(), + new_file_key.clone() + ); + u + } + Err(e) => { + error!(?e, "bucket={}, upload_id={}, Failed to complete multipart upload for file={}", self.bucket.clone(), upload_id.clone(), new_file_key.clone()); + + // completing the file didn't work out, so abort it completely. + match self + .s3_client + .abort_multipart_upload() + .bucket(self.bucket.clone()) + .key(new_file_key.clone()) + .upload_id(upload_id.clone()) + .send() + .await + { + Ok(_v) => info!("bucket={}, upload_id={}, Aborted multipart upload for file={}", self.bucket.clone(), upload_id.clone(), new_file_key.clone()), + Err(e) => error!(?e, "bucket={}, upload_id={}, Failed to abort multipart upload for file={}", self.bucket.clone(), upload_id.clone(), new_file_key.clone()), + }; + + continue; + } + }; + } + + // remove all the files from S3 that have been merged into the larger file + for file in files_to_delete { + match self + .s3_client + .delete_object() + .bucket(self.bucket.clone()) + .key(file.clone()) + .send() + .await + { + Ok(_result) => { + info!( + message = format!( + "File={} removed from bucket={} after merge successful file consolidation", + file.clone(), + self.bucket.clone() + ) + ) + } + Err(e) => error!( + ?e, + "bucket={}, Failed to delete merged file={}", + self.bucket.clone(), + file.clone() + ), + }; + } // end else multipart logic + } // end files to consolidate loop + } +} + +// helper class for the files that we're consolidating into a single file +#[derive(Debug)] +pub struct ConsolidationFile { + pub compressed: bool, + pub size: i32, + pub key: String, +} + +impl ConsolidationFile { + pub const fn new(compressed: bool, size: i32, key: String) -> ConsolidationFile { + ConsolidationFile { + compressed, + size, + key, + } + } +} + +fn bytes_to_bytestream(buf: Bytes) -> ByteStream { + ByteStream::from(buf) +} + +/* + handles taking in a list of files and grabbing however many + files which combined is the requested size. + @requested_size_bytes: the total size of data requested + @files: the list of files to pick from. + @@returns: a vector of consolidation files +*/ +fn splice_files_list( + requested_size_bytes: i32, + files: &mut Vec, +) -> Vec { + let mut total_bytes: i32 = 0; + for i in 0..files.len() { + total_bytes += files[i].size; + + if total_bytes >= requested_size_bytes { + return files.drain(0..i + 1).collect(); + } + } + + std::mem::take(files) +} + +/* + Handles reading the s3 bucket and evaluating the files + which can be merged into larger files + @client: the s3 client + @bucket: the s3 bucket + @key_prefix: the prefix path for the files + @@returns: Vector, the files which can be merged. +*/ +pub async fn get_files_to_consolidate( + client: &S3Client, + bucket: String, + key_prefix: String, +) -> Result, Error> { + let list_result = client + .list_objects_v2() + .bucket(bucket.clone()) + .prefix(key_prefix.clone()) + .send() + .await?; + + if list_result.contents.is_none() { + info!( + "bucket={}, prefix={}, No files found", + bucket.clone(), + key_prefix.clone(), + ); + let v: Vec = Vec::new(); + return Ok(v); + } + + let mut sorted_objects = list_result.contents().unwrap().to_vec(); + sorted_objects.sort_by_key(|x| x.last_modified().unwrap().secs()); + + let mut files_to_consolidate: Vec = Vec::new(); + for key_object in sorted_objects { + let key = key_object.key().unwrap(); + + let tag_result = client + .get_object_tagging() + .bucket(bucket.clone()) + .key(key) + .send() + .await?; + + // this file is the result of a previous merge + let mut mezmo_merged_file = false; + // this file wasn't produced by the mezmo s3 process + let mut mezmo_produced_file = false; + // not breaking down standard json files as we don't want to load download + // the whole file into memory. We're trying to straight memory copy here. + let mut can_combine = false; + + let tags = tag_result.tag_set().unwrap_or_default(); + for tag in tags.iter() { + match tag.key().unwrap_or_default() { + "mezmo_pipeline_merged" => mezmo_merged_file = true, + "mezmo_pipeline_s3_sink" => mezmo_produced_file = true, + "mezmo_pipeline_s3_type" => match tag.value().unwrap() { + "ndjson" => can_combine = true, + "text" => can_combine = true, + "json" => can_combine = false, + _ => can_combine = false, + }, + _ => info!(message = "unrecognized tag:".to_owned() + tag.key().unwrap()), + } + } + + // scroll through the tags and determine if we can even combine the file + if mezmo_merged_file || !mezmo_produced_file || !can_combine { + continue; + } + + // figure out the object size and keys + match client + .head_object() + .bucket(bucket.clone()) + .key(key) + .send() + .await + { + Ok(head) => { + let compressed = head.content_encoding().unwrap_or_default() == "gzip"; + let size = head.content_length() as i32; + let key = key.to_string(); + + files_to_consolidate.push(ConsolidationFile::new(compressed, size, key)); + } + Err(e) => error!(?e, "bucket={}, Failed to head file={}", bucket.clone(), key), + }; + } // end retrieving objects and sorting + + Ok(files_to_consolidate) +} + +/* + Handles downloading the byte data from all the provided files. + Internally handles failures to retrieve a file by only populating + the files_to_delete with files which were successfully downloaded. + If the file is compressed, handles also decompressing the document + via gzip compression. + @client: the s3 client + @bucket: the s3 bucket + @files_to_delete: populated with the files which were successfully downloaded + @@returns: Bytes, the byte data representing all the downloaded files +*/ +async fn download_all_files_as_bytes( + client: &S3Client, + bucket: String, + files: &[ConsolidationFile], + files_to_delete: &mut Vec, +) -> Result { + let mut buf = BytesMut::with_capacity(0); + for file in files.iter() { + let b: Bytes = match download_bytes(client, bucket.clone(), file.key.clone()).await { + Ok(b) => b, + Err(e) => { + error!( + ?e, + "bucket={}, Failed to download file={}", + bucket.clone(), + file.key.clone(), + ); + continue; + } + }; + + if file.compressed { + let decompressed = decompress_gzip(&b); + buf.extend_from_slice(&decompressed); + } else { + buf.extend_from_slice(&b); + } + + //add a newline as a separator + buf.extend_from_slice(b"\n"); + + // file downloaded successfully so mark it for potential deletion + files_to_delete.push(file.key.clone()); + } + + Ok(Bytes::from(buf)) +} + +/* + Handles downloading the byte data from the provided file and returns + the vector representation of the bytes. + If the file is compressed, handles also decompressing the document + via gzip compression. + @client: the s3 client + @bucket: the s3 bucket + @files_to_delete: populated with the files which were successfully downloaded + @@returns: Bytes, the byte data representing all the downloaded files +*/ +async fn download_file_as_vec( + client: &S3Client, + bucket: String, + file: &ConsolidationFile, +) -> Result, Error> { + let b: Bytes = download_bytes(client, bucket.clone(), file.key.clone()).await?; + + let mut buf = BytesMut::with_capacity(0); + if file.compressed { + let decompressed = decompress_gzip(&b); + buf.extend_from_slice(&decompressed); + } else { + buf.extend_from_slice(&b); + } + + Ok(buf.to_vec()) +} + +/* + Handles gzip decompression of the bytes provided. + @bytes: the byte representation of the file + @@returns: the vector representing the decompressed bytes +*/ +fn decompress_gzip(bytes: &Bytes) -> Vec { + //place the bytes into a buffer that'll decode gzip + let cursor = Cursor::new(bytes); + let in_gz = GzDecoder::new(cursor); + let mut in_buf = io::BufReader::with_capacity(ONE_MEGABYTE_USIZE, in_gz); + + // https://web.mit.edu/rust-lang_v1.25/arch/amd64_ubuntu1404/share/doc/rust/html/std/io/struct.BufReader.html + let mut vec: Vec = Vec::new(); + _ = in_buf.read_to_end(&mut vec); + vec +} + +/* + Handles retrieval of the s3 document from storage + @client: the s3 client + @bucket: the s3 bucket + @key: the file key + @@returns: the byte data of the file +*/ +async fn download_bytes(client: &S3Client, bucket: String, key: String) -> Result { + let object = client.get_object().bucket(bucket).key(key).send().await?; + + let body = match object.body.collect().await { + Ok(body) => body.into_bytes(), + Err(e) => return Err(Error::Unhandled(Box::new(e))), + }; + + Ok(body) +} + +#[cfg(test)] +mod tests { + use bytes::{Bytes, BytesMut}; + use flate2::read::GzEncoder; + use flate2::Compression; + use std::io::Read; + + use crate::sinks::aws_s3::file_consolidation_processor::decompress_gzip; + use crate::sinks::aws_s3::file_consolidation_processor::splice_files_list; + use crate::sinks::aws_s3::file_consolidation_processor::ConsolidationFile; + + #[test] + fn splice_empty_list() { + let mut files: Vec = Vec::new(); + + let result = splice_files_list(1000, &mut files); + assert_eq!(files.len(), 0); + assert_eq!(result.len(), 0); + } + + #[test] + fn splice_single_item() { + let mut files: Vec = Vec::new(); + for i in 0..1 { + files.push(ConsolidationFile { + compressed: false, + size: 10, + key: i.to_string().to_owned(), + }); + } + + let result = splice_files_list(9, &mut files); + assert_eq!(files.len(), 0); + assert_eq!(result.len(), 1); + } + + #[test] + fn splice_partial_list() { + let mut files: Vec = Vec::new(); + for i in 0..10 { + files.push(ConsolidationFile { + compressed: false, + size: 10, + key: i.to_string().to_owned(), + }); + } + + let result = splice_files_list(40, &mut files); + assert_eq!(files.len(), 6); + assert_eq!(result.len(), 4); + } + + #[test] + fn splice_entire_list() { + let mut files: Vec = Vec::new(); + for i in 0..10 { + files.push(ConsolidationFile { + compressed: false, + size: 10, + key: i.to_string().to_owned(), + }); + } + + let result = splice_files_list(1000, &mut files); + assert_eq!(files.len(), 0); + assert_eq!(result.len(), 10); + } + + #[test] + fn decompress_document_data() { + let hello_world = "hello world".to_owned(); + + // compress the text + let mut ret_vec = [0; 100]; + let mut bytestring = hello_world.as_bytes(); + let mut gz = GzEncoder::new(&mut bytestring, Compression::fast()); + let count = gz.read(&mut ret_vec).unwrap(); + let vec = ret_vec[0..count].to_vec(); + + let mut bytes_mut = BytesMut::with_capacity(0); + bytes_mut.extend_from_slice(&vec); + let bytes = Bytes::from(bytes_mut); + + //decompress + let decompressed = decompress_gzip(&bytes); + let s = std::str::from_utf8(&decompressed).unwrap(); + assert_eq!(hello_world, s); + } +} diff --git a/src/sinks/aws_s3/file_consolidator_async.rs b/src/sinks/aws_s3/file_consolidator_async.rs new file mode 100644 index 0000000000000..1b3e33ed2640d --- /dev/null +++ b/src/sinks/aws_s3/file_consolidator_async.rs @@ -0,0 +1,214 @@ +use vector_config::configurable_component; + +use crate::aws::AwsAuthentication; +use crate::config::ProxyConfig; +use crate::tls::TlsConfig; +use aws_smithy_http::endpoint::Endpoint; +use aws_types::region::Region; + +use crate::sinks::aws_s3::file_consolidation_processor::FileConsolidationProcessor; +use crate::{aws::create_client, common::s3::S3ClientBuilder}; +use tokio::task::JoinHandle; + +/// File Consolidation +/// Depending on the configuration of the sink and the throughput of data, +/// S3 may receive hundreds and thousands of files. This is unmanageable from +/// the customer perspective. Instead of increasing the memory or disk footprint +/// locally, allow everything to process and later on combine all the files +/// +/// Assumption(s): +/// 1. All files within the bucket directory are of the same format configured +/// to the sink +#[configurable_component] +#[derive(Clone, Debug, Copy)] +#[serde(deny_unknown_fields)] +pub struct FileConsolidationConfig { + /// boolean indicating if the consolidation process is enabled + pub enabled: bool, + + /// Indicates the file consolidation should occur every 'X' milliseconds + pub process_every_ms: u64, + + /// Indicates the size of the consolidation file that is produced + pub requested_size_bytes: i32, +} + +impl Default for FileConsolidationConfig { + fn default() -> Self { + Self { + enabled: false, + process_every_ms: 600000, // 10 min + requested_size_bytes: 500000000, // 500 MB + } + } +} + +// handles consolidating the small files within AWS into much larger files +#[derive(Debug, Default)] +pub struct FileConsolidatorAsync { + auth: AwsAuthentication, + region: Option, + endpoint: Option, + proxy: ProxyConfig, + tls_options: Option, + file_consolidation_config: FileConsolidationConfig, + bucket: String, + key_prefix: String, + join_handle: Option>, +} + +impl AsRef for FileConsolidatorAsync { + fn as_ref(&self) -> &FileConsolidatorAsync { + self + } +} + +impl FileConsolidatorAsync { + #[allow(clippy::too_many_arguments)] + pub const fn new( + auth: AwsAuthentication, + region: Option, + endpoint: Option, + proxy: ProxyConfig, + tls_options: Option, + file_consolidation_config: FileConsolidationConfig, + bucket: String, + key_prefix: String, + ) -> FileConsolidatorAsync { + FileConsolidatorAsync { + auth, + region, + endpoint, + proxy, + tls_options, + file_consolidation_config, + bucket, + key_prefix, + join_handle: None, + } + } + + pub fn start(&mut self) -> bool { + if self.join_handle.is_some() { + info!( + message = + "bucket={}, prefix={}, Thread for S3 file consolidation already in progress", + bucket = self.bucket, + key_prefix = self.key_prefix, + ); + return false; + } + + info!( + message = "bucket={}, prefix={}, Initiating thread for S3 file consolidation", + bucket = self.bucket, + key_prefix = self.key_prefix, + ); + + const TEN_MINUTES_MS: u64 = 10 * 60 * 1000; + + let process_every_ms = if self.file_consolidation_config.process_every_ms > 0 { + self.file_consolidation_config.process_every_ms + } else { + TEN_MINUTES_MS + }; + + let box_bucket = Box::new(self.bucket.clone()); + let box_key_prefix = Box::new(self.key_prefix.clone()); + let box_auth = Box::new(self.auth.clone()); + let box_region = Box::new(self.region.clone()); + let box_endpoint = Box::new(self.endpoint.clone()); + let box_proxy = Box::new(self.proxy.clone()); + let box_tls = Box::new(self.tls_options.clone()); + let box_requested_size_bytes = + Box::new(self.file_consolidation_config.requested_size_bytes); + + let spawned = tokio::spawn(async move { + let client = match create_client::( + &box_auth, + (*box_region).clone(), + (*box_endpoint).clone(), + &box_proxy, + &box_tls, + true, + ) + .await + { + Ok(c) => c, + Err(e) => { + error!( + ?e, + "bucket={}, key_prefix={} Failed to create s3 client for consolidation", + (*box_bucket).clone(), + (*box_key_prefix).clone() + ); + return; + } + }; + + loop { + let start_time = tokio::time::Instant::now(); + + info!( + message = "bucket={}, prefix={}, Starting S3 file consolidation", + bucket = (*box_bucket).clone(), + key_prefix = (*box_key_prefix).clone(), + ); + + let processor = FileConsolidationProcessor::new( + &client, + (*box_bucket).clone(), + (*box_key_prefix).clone(), + *box_requested_size_bytes, + ); + + processor.run().await; + info!( + message = "bucket={}, prefix={}, Completed S3 file consolidation", + bucket = (*box_bucket).clone(), + key_prefix = (*box_key_prefix).clone(), + ); + + // determine how long this action took to complete and await + // the duration necessary to restart on the requested interval + let elapsed = start_time.elapsed().as_millis(); + let diff = process_every_ms - elapsed as u64; + if diff > 0 { + info!( + message = + "bucket={}, prefix={}, processing time={} ms, restarting in {} ms", + bucket = (*box_bucket).clone(), + key_prefix = (*box_key_prefix).clone(), + elapsed, + diff + ); + + tokio::time::sleep(tokio::time::Duration::from_millis(diff)).await; + } + } + }); + + self.join_handle = Some(spawned); + true + } + + pub fn stop(&mut self) -> bool { + info!( + message = "Triggering shutdown for S3 file consolidation", + bucket = self.bucket, + key_prefix = self.key_prefix, + ); + + if let Some(h) = self.join_handle.take() { + h.abort(); + } + + info!( + message = "Shutdown for S3 file consolidation complete", + bucket = self.bucket, + key_prefix = self.key_prefix, + ); + + true + } +} diff --git a/src/sinks/aws_s3/integration_tests.rs b/src/sinks/aws_s3/integration_tests.rs index 17d54ffa710ba..f7299e0dd0e67 100644 --- a/src/sinks/aws_s3/integration_tests.rs +++ b/src/sinks/aws_s3/integration_tests.rs @@ -363,7 +363,8 @@ async fn s3_healthchecks_invalid_bucket() { .is_err()); } -async fn client() -> S3Client { +// MEZMO: upgraded function from private to pub for s3-sink file consolidation +pub async fn client() -> S3Client { let auth = AwsAuthentication::test_auth(); let region = RegionOrEndpoint::with_both("minio", s3_address()); let proxy = ProxyConfig::default(); @@ -400,6 +401,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig { tls: Default::default(), auth: Default::default(), acknowledgements: Default::default(), + file_consolidation_config: Default::default(), //MEZMO: new property for s3-sink file consolidation } } diff --git a/src/sinks/aws_s3/integration_tests_mezmo.rs b/src/sinks/aws_s3/integration_tests_mezmo.rs index b6a6aeea153e1..14d023e8cf56b 100644 --- a/src/sinks/aws_s3/integration_tests_mezmo.rs +++ b/src/sinks/aws_s3/integration_tests_mezmo.rs @@ -1,20 +1,22 @@ #![cfg(all(test, feature = "aws-s3-integration-tests"))] use crate::mezmo::reshape_log_event_by_message; +use crate::tls::TlsConfig; use assay::assay; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use codecs::decoding::format::Deserializer; use codecs::decoding::format::JsonDeserializerConfig; use codecs::{encoding::FramingConfig, JsonSerializerConfig, MetricTagValues}; use futures::Stream; use similar_asserts::assert_eq; use tokio_stream::StreamExt; +use vector_core::config::proxy::ProxyConfig; use vector_core::config::LogNamespace; use vector_core::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, EventArray}; use super::S3SinkConfig; use crate::{ - aws::RegionOrEndpoint, + aws::{AwsAuthentication, RegionOrEndpoint}, config::SinkContext, sinks::{ s3_common::config::S3Options, @@ -26,7 +28,15 @@ use crate::{ }, }; -use super::integration_tests::{create_bucket, get_keys, get_lines, get_object, s3_address}; +use super::file_consolidation_processor::{get_files_to_consolidate, FileConsolidationProcessor}; +use super::file_consolidator_async::{FileConsolidationConfig, FileConsolidatorAsync}; +use super::integration_tests::{ + client, create_bucket, get_keys, get_lines, get_object, s3_address, +}; +use aws_sdk_s3::types::ByteStream; +use flate2::read::GzEncoder; +use std::io::Read; +use std::{thread, time}; #[assay( env = [ @@ -133,6 +143,546 @@ async fn s3_message_objects_not_reshaped_because_of_env() { } } +#[tokio::test] +async fn s3_file_consolidator_run() { + let _cx = SinkContext::new_test(); + let bucket = uuid::Uuid::new_v4().to_string(); + + create_bucket(&bucket, false).await; + + let auth = AwsAuthentication::test_auth(); + let region = RegionOrEndpoint::with_both("minio", s3_address()); + let proxy = ProxyConfig::default(); + let tls_options: Option = None; + + let mut fc = FileConsolidatorAsync::new( + auth, + region.region(), + region.endpoint().unwrap(), + proxy, + tls_options, + FileConsolidationConfig { + enabled: true, + process_every_ms: 10, + requested_size_bytes: 512000000, + }, + bucket.clone(), + "/".to_owned(), + ); + + let started = fc.start(); + assert_eq!(started, true, "started true"); + + thread::sleep(time::Duration::from_millis(1000)); + + let stopped = fc.stop(); + assert_eq!(stopped, true, "stopped true"); +} + +#[tokio::test] +async fn s3_file_consolidation_process_no_files() { + let _cx = SinkContext::new_test(); + + let s3_client = client().await; + let bucket = uuid::Uuid::new_v4().to_string(); + let key_prefix = "/".to_owned(); + let requested_size_bytes: i32 = 10 * 1024 * 1024; + + create_bucket(&bucket, false).await; + + let fcp = FileConsolidationProcessor::new(&s3_client, bucket, key_prefix, requested_size_bytes); + fcp.run().await; +} + +#[tokio::test] +async fn s3_file_consolidation_process_no_tagged_files() { + let _cx = SinkContext::new_test(); + + let s3_client = client().await; + let bucket = uuid::Uuid::new_v4().to_string(); + let key_prefix = "/".to_string(); + let requested_size_bytes: i32 = 10 * 1024 * 1024; + let content_type = "text/x-log".to_string(); + + create_bucket(&bucket, false).await; + put_file( + "file_no_tags.log".to_string(), + Bytes::from("unit test".as_bytes()), + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + None, + ) + .await; + + let keys = get_keys(&bucket, key_prefix.clone()).await; + assert_eq!(keys.len(), 1); + + // runs without errors + let fcp = FileConsolidationProcessor::new(&s3_client, bucket, key_prefix, requested_size_bytes); + fcp.run().await; +} + +#[tokio::test] +async fn s3_file_consolidation_process_tag_filters() { + let _cx = SinkContext::new_test(); + + let s3_client = client().await; + let bucket = uuid::Uuid::new_v4().to_string(); + let key_prefix = "/".to_string(); + let requested_size_bytes: i32 = 1024 * 1024; + let content_type = "text/x-log".to_string(); + + create_bucket(&bucket, false).await; + + let pipeline_merged_tags = + generate_tags("mezmo_pipeline_merged".to_string(), "true".to_string()); + let pipeline_custom_tags = generate_tags("random_tag".to_string(), "true".to_string()); + let mezmo_pipeline_s3_type_ndjson_tags = + generate_tags("mezmo_pipeline_s3_type".to_string(), "ndjson".to_string()); + let mezmo_pipeline_s3_type_text_tags = + generate_tags("mezmo_pipeline_s3_type".to_string(), "text".to_string()); + let mezmo_pipeline_s3_type_json_tags = + generate_tags("mezmo_pipeline_s3_type".to_string(), "json".to_string()); + let mezmo_pipeline_s3_type_unknown_tags = + generate_tags("mezmo_pipeline_s3_type".to_string(), "".to_string()); + + put_file( + "previous_merge.log".to_string(), + Bytes::from("file from previous merge".as_bytes()), + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + pipeline_merged_tags, + ) + .await; + put_file( + "s3_sink.log".to_string(), + Bytes::from("file from s3 sink".as_bytes()), + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + pipeline_custom_tags, + ) + .await; + put_file( + "s3_type_ndjson.log".to_string(), + Bytes::from("file with ndjson".as_bytes()), + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + mezmo_pipeline_s3_type_ndjson_tags, + ) + .await; + put_file( + "s3_type_text.log".to_string(), + Bytes::from("file with text".as_bytes()), + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + mezmo_pipeline_s3_type_text_tags, + ) + .await; + put_file( + "s3_type_json.log".to_string(), + Bytes::from("file with json".as_bytes()), + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + mezmo_pipeline_s3_type_json_tags, + ) + .await; + put_file( + "s3_type_unknown.log".to_string(), + Bytes::from("file with who knows what".as_bytes()), + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + mezmo_pipeline_s3_type_unknown_tags, + ) + .await; + + let keys = get_keys(&bucket, key_prefix.clone()).await; + assert_eq!(keys.len(), 6); + + // only s3 created files with ndjson and text will be merged + match get_files_to_consolidate(&s3_client, bucket.clone(), key_prefix.clone()).await { + Ok(files) => { + assert_eq!(files.len(), 2); + assert_eq!(files[0].size, 16); + assert_eq!(files[0].key, "/s3_type_ndjson.log"); + + assert_eq!(files[1].size, 14); + assert_eq!(files[1].key, "/s3_type_text.log"); + } + Err(err) => panic!("Retrieving files should not error: {}", err), + }; + + let fcp = FileConsolidationProcessor::new( + &s3_client, + bucket.clone(), + key_prefix.clone(), + requested_size_bytes, + ); + fcp.run().await; + + // validate we're down to 5 files now since 2 of them were merged + let keys = get_keys(&bucket, key_prefix.clone()).await; + assert_eq!(keys.len(), 5); + + // untouched files + assert!(keys.contains(&"/s3_sink.log".to_string())); + assert!(keys.contains(&"/s3_type_json.log".to_string())); + assert!(keys.contains(&"/s3_type_unknown.log".to_string())); + assert!(keys.contains(&"/previous_merge.log".to_string())); + + // the new file that should contain the text of the docs + if let Some(k) = keys.into_iter().find(|s| s.ends_with("_merged.log")) { + let obj = get_object(&bucket, k).await; + assert_eq!(obj.content_encoding, Some("identity".to_string())); + assert_eq!(obj.content_type, Some("text/x-log".to_string())); + assert_eq!(obj.content_length, 32); // contents plus newlines + + let response_lines = get_lines(obj).await; + assert_eq!(response_lines.len(), 2); + assert!(response_lines.contains(&"file with ndjson".to_string())); + assert!(response_lines.contains(&"file with text".to_string())); + } else { + panic!("did not find the merged file as expected"); + } +} + +#[tokio::test] +async fn s3_file_consolidation_compressed_files() { + let _cx = SinkContext::new_test(); + + let s3_client = client().await; + let bucket = uuid::Uuid::new_v4().to_string(); + let key_prefix = "/".to_string(); + let requested_size_bytes: i32 = 20 * 1024 * 1024; + let content_type = "text/x-log".to_string(); + + create_bucket(&bucket, false).await; + + // create some text lines and compress them + let ndjson = "{\"property\":\"fkcurxdqnnybrcutaogcvzvdttjzlcavsonfhuianreijaqfpaojjmolsibjzjvcphrjxzorjtvlbphepgfzy\"}".to_owned(); + let text = "ozsggnwocqbrtuzwzudhakpibrkfnewnnuoeyopbmshpgcjicrmgasucmizjqycsvjladptmhtygwwystocxsphnyckeijpyfbvy".to_owned(); + + let compressed_ndjson = compress_text(&ndjson); + let compressed_text = compress_text(&text); + + let mezmo_pipeline_s3_type_ndjson_tags = + generate_tags("mezmo_pipeline_s3_type".to_string(), "ndjson".to_string()); + let mezmo_pipeline_s3_type_text_tags = + generate_tags("mezmo_pipeline_s3_type".to_string(), "text".to_string()); + + put_file( + "ndjson.log".to_string(), + compressed_ndjson, + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + Some("gzip".to_string()), + mezmo_pipeline_s3_type_ndjson_tags, + ) + .await; + put_file( + "text.log".to_string(), + compressed_text, + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + Some("gzip".to_string()), + mezmo_pipeline_s3_type_text_tags, + ) + .await; + + let keys = get_keys(&bucket, key_prefix.clone()).await; + assert_eq!(keys.len(), 2); + + // only s3 created files with ndjson and text will be merged + match get_files_to_consolidate(&s3_client, bucket.clone(), key_prefix.clone()).await { + Ok(files) => { + assert_eq!(files.len(), 2); + assert_eq!(files[0].size, 91); + assert_eq!(files[0].key, "/ndjson.log"); + + assert_eq!(files[1].size, 85); + assert_eq!(files[1].key, "/text.log"); + } + Err(err) => panic!("Retrieving files should not error: {}", err), + }; + + let fcp = FileConsolidationProcessor::new( + &s3_client, + bucket.clone(), + key_prefix.clone(), + requested_size_bytes, + ); + fcp.run().await; + + // validate we're down to 1 files now since 2 of them were merged + let keys = get_keys(&bucket, key_prefix.clone()).await; + assert_eq!(keys.len(), 1); + + // the new file that should contain the text of the docs uncompressed + if let Some(k) = keys.into_iter().find(|s| s.ends_with("_merged.log")) { + let obj = get_object(&bucket, k).await; + assert_eq!(obj.content_encoding, Some("identity".to_string())); + assert_eq!(obj.content_type, Some("text/x-log".to_string())); + assert_eq!(obj.content_length, 202); // decompressed plus newlines + + let response_lines = get_lines(obj).await; + assert_eq!(response_lines.len(), 2); + assert!(response_lines.contains(&ndjson)); + assert!(response_lines.contains(&text)); + } else { + panic!("did not find the merged file as expected"); + } +} + +#[tokio::test] +async fn s3_file_consolidation_multiple_consolidated_files() { + let _cx = SinkContext::new_test(); + + let s3_client = client().await; + let bucket = uuid::Uuid::new_v4().to_string(); + let key_prefix = "/".to_string(); + let requested_size_bytes: i32 = 1024 * 1024; + let content_type = "text/x-log".to_string(); + + create_bucket(&bucket, false).await; + + let hundred_bytes_1 = "{\"property\":\"fkcurxdqnnybrcutaogcvzvdttjzlcavsonfhuianreijaqfpaojjmolsibjzjvcphrjxzorjtvlbphepgfzy\"}"; + for i in 0..5 { + let mut five_hundred_kb = BytesMut::new(); + for _x in 0..5000 { + five_hundred_kb.extend_from_slice(hundred_bytes_1.as_bytes()); + } + + let mezmo_pipeline_s3_type_ndjson_tags = + generate_tags("mezmo_pipeline_s3_type".to_string(), "ndjson".to_string()); + let filename = format!("{}_generated.log", i); + put_file( + filename, + Bytes::from(five_hundred_kb.to_vec()), + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + mezmo_pipeline_s3_type_ndjson_tags, + ) + .await; + } + + let keys = get_keys(&bucket, key_prefix.clone()).await; + assert_eq!(keys.len(), 5); + + match get_files_to_consolidate(&s3_client, bucket.clone(), key_prefix.clone()).await { + Ok(files) => { + assert_eq!(files.len(), 5); + for file in files.iter() { + assert_eq!(file.size, 500000); + } + } + Err(err) => panic!("Retrieving files should not error: {}", err), + }; + + let fcp = FileConsolidationProcessor::new( + &s3_client, + bucket.clone(), + key_prefix.clone(), + requested_size_bytes, + ); + fcp.run().await; + + // validate we're down to 2 files now + let keys = get_keys(&bucket, key_prefix.clone()).await; + assert_eq!(keys.len(), 2); + + let mut i = 0; + for k in keys.iter() { + assert!(k.ends_with("_merged.log")); + + let obj = get_object(&bucket, k.to_string()).await; + assert_eq!(obj.content_encoding, Some("identity".to_string())); + assert_eq!(obj.content_type, Some("text/x-log".to_string())); + + // the file has either 3 or 2 lines + let response_lines = get_lines(obj).await; + let lc = response_lines.len(); + assert!(lc == 2 || lc == 3); + i += lc; + } + + // all five lines are found between the 2 files + assert_eq!(i, 5); +} + +#[tokio::test] +async fn s3_file_consolidation_large_files() { + let _cx = SinkContext::new_test(); + + let s3_client = client().await; + let bucket = uuid::Uuid::new_v4().to_string(); + let key_prefix = "/".to_string(); + let requested_size_bytes: i32 = 20 * 1024 * 1024; + let content_type = "text/x-log".to_string(); + + create_bucket(&bucket, false).await; + + let hundred_bytes = "ozsggnwocqbrtuzwzudhakpibrkfnewnnuoeyopbmshpgcjicrmgasucmizjqycsvjladptmhtygwwystocxsphnyckeijpyfbvy"; + + // build about 8 MB worth of small files so we can flush a single part + for i in 0..15 { + let mut five_hundred_kb = BytesMut::new(); + for _x in 0..5000 { + five_hundred_kb.extend_from_slice(hundred_bytes.as_bytes()); + five_hundred_kb.extend_from_slice(b"\n"); + } + + let mezmo_pipeline_s3_type_text_tags = + generate_tags("mezmo_pipeline_s3_type".to_string(), "text".to_string()); + let filename = format!("{}_generated.log", i); + put_file( + filename, + Bytes::from(five_hundred_kb.to_vec()), + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + mezmo_pipeline_s3_type_text_tags, + ) + .await; + } + + // a small compressed file to show decompression during upload parts. + let compressed_text = { + let mut compressed = BytesMut::new(); + for _i in 0..10 { + compressed.extend_from_slice(hundred_bytes.as_bytes()); + compressed.extend_from_slice(b"\n"); + } + + let str = String::from_utf8(compressed.to_vec()).unwrap(); + compress_text(&str) + }; + + // create a 6 MB file to go over the threshold and use the upload_copy_part + let mut six_megs_uncompressed = BytesMut::new(); + for _i in 0..60000 { + six_megs_uncompressed.extend_from_slice(hundred_bytes.as_bytes()); + six_megs_uncompressed.extend_from_slice(b"\n"); + } + + let mezmo_pipeline_s3_type_text_tags = + generate_tags("mezmo_pipeline_s3_type".to_string(), "text".to_string()); + + put_file( + "some_compressed_data.log".to_string(), + compressed_text, + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + Some("gzip".to_string()), + mezmo_pipeline_s3_type_text_tags.clone(), + ) + .await; + put_file( + "6MB_uncompressed.log".to_string(), + Bytes::from(six_megs_uncompressed), + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + mezmo_pipeline_s3_type_text_tags.clone(), + ) + .await; + + let keys = get_keys(&bucket, key_prefix.clone()).await; + assert_eq!(keys.len(), 17); + + let fcp = FileConsolidationProcessor::new( + &s3_client, + bucket.clone(), + key_prefix.clone(), + requested_size_bytes, + ); + fcp.run().await; + + // validate we're down to 1 file + let keys = get_keys(&bucket, key_prefix.clone()).await; + assert_eq!(keys.len(), 1); + + // the new file that should contain the text of the docs + if let Some(k) = keys.into_iter().find(|s| s.ends_with("_merged.log")) { + let obj = get_object(&bucket, k).await; + assert_eq!(obj.content_encoding, Some("identity".to_string())); + assert_eq!(obj.content_type, Some("text/x-log".to_string())); + + // 15 files of 5000 lines + // 1 file of 10 lines + // 1 file of 60000 lines + // newlines between each added file + let response_lines = get_lines(obj).await; + assert_eq!(response_lines.len(), 135026); + } else { + panic!("did not find the merged file as expected"); + } +} + +async fn put_file( + file_name: String, + content: Bytes, + prefix: String, + bucket: String, + content_type: String, + content_encoding: Option, + tags: Option, +) { + _ = client() + .await + .put_object() + .body(ByteStream::from(content)) + .bucket(bucket.clone()) + .key(format!("{}{}", prefix.clone(), file_name)) + .set_content_type(Some(content_type.clone())) + .set_content_encoding(content_encoding.clone()) + .set_tagging(tags) + .send() + .await; +} + +fn compress_text(value: &String) -> Bytes { + let mut ret_vec = [0; 1000000]; + let mut bytestring = value.as_bytes(); + let mut gz = GzEncoder::new(&mut bytestring, flate2::Compression::fast()); + let count = gz.read(&mut ret_vec).unwrap(); + let vec = ret_vec[0..count].to_vec(); + + let mut bytes_mut = BytesMut::with_capacity(0); + bytes_mut.extend_from_slice(&vec); + Bytes::from(bytes_mut) +} + +fn generate_tags(key: String, value: String) -> Option { + let tags = { + let mut tagging = url::form_urlencoded::Serializer::new(String::new()); + tagging.append_pair(&key, &value); + tagging.append_pair("mezmo_pipeline_s3_sink", "true"); + tagging.finish() + }; + + Some(tags) +} + fn json_config(bucket: &str, batch_size: usize) -> S3SinkConfig { let mut batch = BatchConfig::default(); batch.max_events = Some(batch_size); @@ -157,6 +707,7 @@ fn json_config(bucket: &str, batch_size: usize) -> S3SinkConfig { tls: Default::default(), auth: Default::default(), acknowledgements: Default::default(), + file_consolidation_config: Default::default(), } } diff --git a/src/sinks/aws_s3/mod.rs b/src/sinks/aws_s3/mod.rs index 421a4730b5e82..2d1183f222da0 100644 --- a/src/sinks/aws_s3/mod.rs +++ b/src/sinks/aws_s3/mod.rs @@ -5,3 +5,7 @@ mod integration_tests; mod integration_tests_mezmo; pub use self::config::S3SinkConfig; + +// MEZMO: added files for s3-sink file consolidation +pub mod file_consolidation_processor; +pub mod file_consolidator_async; diff --git a/src/sinks/datadog_archives.rs b/src/sinks/datadog_archives.rs index d794283bd9f77..e55917384e53d 100644 --- a/src/sinks/datadog_archives.rs +++ b/src/sinks/datadog_archives.rs @@ -395,7 +395,14 @@ impl DatadogArchivesSinkConfig { self.encoding.clone(), ); - let sink = S3Sink::new(service, request_builder, partitioner, batcher_settings); + // MEZMO: added file consolidation processing which is optional, so None parameter added + let sink = S3Sink::new( + service, + request_builder, + partitioner, + batcher_settings, + None, + ); Ok(VectorSink::from_event_streamsink(sink)) } diff --git a/src/sinks/s3_common/sink.rs b/src/sinks/s3_common/sink.rs index 08e33674a02ad..7ea02dce91869 100644 --- a/src/sinks/s3_common/sink.rs +++ b/src/sinks/s3_common/sink.rs @@ -18,12 +18,17 @@ use crate::{ }; use super::partitioner::{S3KeyPartitioner, S3PartitionKey}; +// MEZMO: added dependency for s3-sink file consolidation +use crate::sinks::aws_s3::file_consolidator_async::FileConsolidatorAsync; pub struct S3Sink { service: Svc, request_builder: RB, partitioner: S3KeyPartitioner, batcher_settings: BatcherSettings, + + // MEZMO: added property for s3-sink file consolidation + file_consolidator: Option, } impl S3Sink { @@ -32,12 +37,14 @@ impl S3Sink { request_builder: RB, partitioner: S3KeyPartitioner, batcher_settings: BatcherSettings, + file_consolidator: Option, ) -> Self { Self { partitioner, service, request_builder, batcher_settings, + file_consolidator, } } } @@ -59,7 +66,12 @@ where let builder_limit = NonZeroUsize::new(64); let request_builder = self.request_builder; - input + // MEZMO: added file consolidation processing + // initiate the file consolidation process if necessary + let mut file_consolidator = self.file_consolidator.unwrap_or_default(); + file_consolidator.start(); + + let result = input .batched_partitioned(partitioner, settings) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder(builder_limit, request_builder) @@ -74,7 +86,13 @@ where }) .into_driver(self.service) .run() - .await + .await; + + // MEZMO: added file consolidation processing + //stop the file consolidation process if necessary + file_consolidator.stop(); + + result } }