Skip to content

Commit

Permalink
enhancement(file sink, aws_s3 sink, gcp_cloud_storage): configurable …
Browse files Browse the repository at this point in the history
…filename timezone (vectordotdev#18506)

* add TzOffset as File Sink configuration

* integrate TzOffset into File Sink

* apply tz offset to all log event in render_timestamp.

* added TzOffset tests

* adding chrono-tz for parsing timezones

* rename tz_offset to path_tz. timezones are safer than offsets

* update tz_offset references to path_tz

* cargo fmt

* remove unnecessary commented out code. fmt and generate-component-docs

* clippy suggestions and remove TryFrom<&str> - serde handles converting to String

* rename Template config option `path_tz` to `timezone`

* move `path_tz.rs` to `src/config`

preparing for applying the same to `aws_s3` sink for filename timezone

* update doc configuration description for path_tz

* fix wrong method name

* AWS and GCS filename timezone support

  * remove custom tz config
  * use VRL's timezone config
  * pass around SinkContext

* use TzOffset to pass down to request builders. VRL's TimeZone can't be hash derived

* make key_prefix timezone aware and use Option `or` syntax

* move tz to offset conversion codes to sink util

* remove empty line

* update timezone docs in vector-config

* get timezone and convert to offset in one go in FileSink

* just pass the sinkconfig directly

* vector_common to vector_lib

* configurable_component is in vector_lib now

* lookup to vector_lib

* fix aws s3 integration test. pass the context to build_processor in tests

* formatting

* add sinkcontext to FileSink in file tests

* key_prefix is expected to be a template. no need for into
  • Loading branch information
kates authored and AndrooTheChen committed Sep 23, 2024
1 parent a7d002b commit cf8e232
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 50 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ bollard = { version = "0.15.0", default-features = false, features = ["ssl", "ch
bytes = { version = "1.5.0", default-features = false, features = ["serde"] }
bytesize = { version = "1.3.0", default-features = false }
chrono = { version = "0.4.31", default-features = false, features = ["serde"] }
chrono-tz = { version = "0.8.3", default-features = false }
cidr-utils = { version = "0.5.11", default-features = false }
clap = { version = "4.4.8", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] }
colored = { version = "2.0.4", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-config/src/external/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ impl Configurable for TimeZone {

fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_title("Timezone reference.");
metadata.set_description(r#"This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone.
metadata.set_title("Timezone to use for any date specifiers in template strings.");
metadata.set_description(r#"This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. It will default to the [globally configured timezone](https://vector.dev/docs/reference/configuration/global-options/#timezone).
[tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones"#);
metadata.add_custom_attribute(CustomAttribute::kv(
Expand Down
31 changes: 24 additions & 7 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::convert::TryInto;

use aws_sdk_s3::Client as S3Client;
use tower::ServiceBuilder;
use vector_lib::codecs::{
Expand All @@ -8,6 +6,7 @@ use vector_lib::codecs::{
};
use vector_lib::configurable::configurable_component;
use vector_lib::sink::VectorSink;
use vector_lib::TimeZone;

use super::sink::S3RequestOptions;
use crate::{
Expand All @@ -23,8 +22,8 @@ use crate::{
sink::S3Sink,
},
util::{
BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
TowerRequestConfig,
timezone_to_offset, BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression,
ServiceBuilderExt, TowerRequestConfig,
},
Healthcheck,
},
Expand Down Expand Up @@ -136,6 +135,10 @@ pub struct S3SinkConfig {
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub acknowledgements: AcknowledgementsConfig,

#[configurable(derived)]
#[serde(default)]
pub timezone: Option<TimeZone>,
}

pub(super) fn default_key_prefix() -> String {
Expand Down Expand Up @@ -163,6 +166,7 @@ impl GenerateConfig for S3SinkConfig {
tls: Some(TlsConfig::default()),
auth: AwsAuthentication::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
})
.unwrap()
}
Expand All @@ -174,7 +178,7 @@ impl SinkConfig for S3SinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let service = self.create_service(&cx.proxy).await?;
let healthcheck = self.build_healthcheck(service.client())?;
let sink = self.build_processor(service)?;
let sink = self.build_processor(service, cx)?;
Ok((sink, healthcheck))
}

Expand All @@ -188,7 +192,11 @@ impl SinkConfig for S3SinkConfig {
}

impl S3SinkConfig {
pub fn build_processor(&self, service: S3Service) -> crate::Result<VectorSink> {
pub fn build_processor(
&self,
service: S3Service,
cx: SinkContext,
) -> crate::Result<VectorSink> {
// Build our S3 client/service, which is what we'll ultimately feed
// requests into in order to ship files to S3. We build this here in
// order to configure the client/service with retries, concurrency
Expand All @@ -198,16 +206,24 @@ impl S3SinkConfig {
.settings(request_limits, S3RetryLogic)
.service(service);

let offset = self
.timezone
.or(cx.globals.timezone)
.and_then(timezone_to_offset);

// Configure our partitioning/batching.
let batch_settings = self.batch.into_batcher_settings()?;
let key_prefix = self.key_prefix.clone().try_into()?;

let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);

let ssekms_key_id = self
.options
.ssekms_key_id
.as_ref()
.cloned()
.map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
.transpose()?;

let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id);

let transformer = self.encoding.transformer();
Expand All @@ -222,6 +238,7 @@ impl S3SinkConfig {
filename_append_uuid: self.filename_append_uuid,
encoder: (transformer, encoder),
compression: self.compression,
filename_tz_offset: offset,
};

let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
Expand Down
20 changes: 11 additions & 9 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn s3_insert_message_into_with_flat_key_prefix() {
config.key_prefix = "test-prefix".to_string();
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, 10);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -95,7 +95,7 @@ async fn s3_insert_message_into_with_folder_key_prefix() {
config.key_prefix = "test-prefix/".to_string();
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, 10);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -132,7 +132,7 @@ async fn s3_insert_message_into_with_ssekms_key_id() {
config.options.ssekms_key_id = Some("alias/aws/s3".to_string());

let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, 10);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn s3_rotate_files_after_the_buffer_size_is_reached() {
};
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, _events) = random_lines_with_stream(100, 30, None);

Expand Down Expand Up @@ -229,7 +229,7 @@ async fn s3_gzip() {

let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, batch_size * batch_multiplier);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -274,7 +274,7 @@ async fn s3_zstd() {

let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, batch_size * batch_multiplier);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -336,7 +336,7 @@ async fn s3_insert_message_into_object_lock() {
let config = config(&bucket, 1000000);
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, 10);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -368,7 +368,7 @@ async fn acknowledges_failures() {
config.bucket = format!("BREAK{}IT", config.bucket);
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (_lines, events, receiver) = make_events_batch(1, 1);
run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
Expand Down Expand Up @@ -434,11 +434,12 @@ async fn s3_flush_on_exhaustion() {
tls: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
}
};
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, _events) = random_lines_with_stream(100, 2, None); // only generate two events (less than batch size)

Expand Down Expand Up @@ -517,6 +518,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig {
tls: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ mod sink;

mod integration_tests;

pub use self::config::S3SinkConfig;
pub use config::S3SinkConfig;
12 changes: 10 additions & 2 deletions src/sinks/aws_s3/sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;

use bytes::Bytes;
use chrono::Utc;
use chrono::{FixedOffset, Utc};
use uuid::Uuid;
use vector_lib::codecs::encoding::Framer;
use vector_lib::event::Finalizable;
Expand Down Expand Up @@ -32,6 +32,7 @@ pub struct S3RequestOptions {
pub api_options: S3Options,
pub encoder: (Transformer, Encoder<Framer>),
pub compression: Compression,
pub filename_tz_offset: Option<FixedOffset>,
}

impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
Expand Down Expand Up @@ -76,7 +77,14 @@ impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
let filename = {
let formatted_ts = Utc::now().format(self.filename_time_format.as_str());
let formatted_ts = match self.filename_tz_offset {
Some(offset) => Utc::now()
.with_timezone(&offset)
.format(self.filename_time_format.as_str()),
None => Utc::now()
.with_timezone(&chrono::Utc)
.format(self.filename_time_format.as_str()),
};

self.filename_append_uuid
.then(|| format!("{}-{}", formatted_ts, Uuid::new_v4().hyphenated()))
Expand Down
32 changes: 24 additions & 8 deletions src/sinks/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use vector_lib::codecs::{
use vector_lib::configurable::configurable_component;
use vector_lib::{
internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered},
EstimatedJsonEncodedSizeOf,
EstimatedJsonEncodedSizeOf, TimeZone,
};

use crate::{
Expand All @@ -33,9 +33,10 @@ use crate::{
internal_events::{
FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError,
},
sinks::util::StreamSink,
sinks::util::{timezone_to_offset, StreamSink},
template::Template,
};

mod bytes_path;

use bytes_path::BytesPath;
Expand Down Expand Up @@ -84,6 +85,10 @@ pub struct FileSinkConfig {
)]
pub acknowledgements: AcknowledgementsConfig,

#[configurable(derived)]
#[serde(default)]
pub timezone: Option<TimeZone>,

#[configurable(derived)]
#[serde(default)]
internal_metrics: FileInternalMetricsConfig,
Expand All @@ -97,6 +102,7 @@ impl GenerateConfig for FileSinkConfig {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
})
.unwrap()
Expand Down Expand Up @@ -181,9 +187,9 @@ impl OutFile {
impl SinkConfig for FileSinkConfig {
async fn build(
&self,
_cx: SinkContext,
cx: SinkContext,
) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
let sink = FileSink::new(self)?;
let sink = FileSink::new(self, cx)?;
Ok((
super::VectorSink::from_event_streamsink(sink),
future::ok(()).boxed(),
Expand Down Expand Up @@ -211,13 +217,18 @@ pub struct FileSink {
}

impl FileSink {
pub fn new(config: &FileSinkConfig) -> crate::Result<Self> {
pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);

let offset = config
.timezone
.or(cx.globals.timezone)
.and_then(timezone_to_offset);

Ok(Self {
path: config.path.clone(),
path: config.path.clone().with_tz_offset(offset),
transformer,
encoder,
idle_timeout: config.idle_timeout,
Expand Down Expand Up @@ -464,6 +475,7 @@ mod tests {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
};

Expand All @@ -487,6 +499,7 @@ mod tests {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::Gzip,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
};

Expand All @@ -510,6 +523,7 @@ mod tests {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::Zstd,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
};

Expand Down Expand Up @@ -538,6 +552,7 @@ mod tests {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
};

Expand Down Expand Up @@ -617,6 +632,7 @@ mod tests {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
};

Expand All @@ -626,7 +642,7 @@ mod tests {

let sink_handle = tokio::spawn(async move {
assert_sink_compliance(&FILE_SINK_TAGS, async move {
let sink = FileSink::new(&config).unwrap();
let sink = FileSink::new(&config, SinkContext::default()).unwrap();
VectorSink::from_event_streamsink(sink)
.run(Box::pin(rx.map(Into::into)))
.await
Expand Down Expand Up @@ -670,7 +686,7 @@ mod tests {

async fn run_assert_sink(config: FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
assert_sink_compliance(&FILE_SINK_TAGS, async move {
let sink = FileSink::new(&config).unwrap();
let sink = FileSink::new(&config, SinkContext::default()).unwrap();
VectorSink::from_event_streamsink(sink)
.run(Box::pin(stream::iter(events.map(Into::into))))
.await
Expand Down
Loading

0 comments on commit cf8e232

Please sign in to comment.