Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(file sink, aws_s3 sink, gcp_cloud_storage): configurable filename timezone #18506

Merged
merged 57 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
fe5873f
add TzOffset as File Sink configuration
kates Mar 30, 2023
797d9fa
integrate TzOffset into File Sink
kates Mar 30, 2023
086432d
Merge branch 'master' into tz-offset
kates Mar 30, 2023
b24aa2d
apply tz offset to all log event in render_timestamp.
kates Mar 30, 2023
1540a70
added TzOffset tests
kates Mar 31, 2023
3f9c10d
Merge branch 'master' into tz-offset
kates Mar 31, 2023
e123c9c
Merge branch 'master' into tz-offset
kates Apr 3, 2023
edb2682
adding chrono-tz for parsing timezones
kates Apr 3, 2023
5454523
rename tz_offset to path_tz. timezones are safer than offsets
kates Apr 3, 2023
535f577
update tz_offset references to path_tz
kates Apr 3, 2023
3495afa
cargo fmt
kates Apr 5, 2023
0e66609
Merge branch 'master' into tz-offset
kates Apr 5, 2023
076fa9b
Merge branch 'master' into tz-offset
kates Apr 9, 2023
7ff8624
remove unnecessary commented out code. fmt and generate-component-docs
kates Apr 10, 2023
86c64ab
Merge branch 'master' into tz-offset
kates Apr 12, 2023
e37fbfa
clippy suggestions and remove TryFrom<&str> - serde handles convertin…
kates Apr 12, 2023
2e2a432
Merge branch 'master' into tz-offset
kates Apr 13, 2023
af104ec
Merge branch 'master' into tz-offset
kates Apr 25, 2023
c022635
Merge branch 'master' into tz-offset
kates Apr 26, 2023
86a8a82
rename Template config option `path_tz` to `timezone`
kates Apr 26, 2023
ad76c17
move `path_tz.rs` to `src/config`
kates Apr 26, 2023
e3a4f94
Merge branch 'master' into tz-offset
kates Apr 28, 2023
5e8a2a6
update doc configuration description for path_tz
kates Apr 28, 2023
f01e339
Merge branch 'master' into tz-offset
kates May 2, 2023
21106bc
fix wrong method name
kates May 2, 2023
e2761b0
Merge branch 'master' into tz-offset
kates Sep 7, 2023
5e8ae7e
AWS and GCS filename timezone support
kates Sep 7, 2023
2ee2ea9
Merge branch 'master' into filename-timezone
kates Sep 8, 2023
0366297
Merge branch 'master' into filename-timezone
kates Sep 11, 2023
9e4530f
use TzOffset to pass down to request builders. VRL's TimeZone can't b…
kates Sep 11, 2023
03dda4c
make key_prefix timezone aware and use Option `or` syntax
kates Sep 14, 2023
6c5b2b3
Merge branch 'master' into filename-timezone
kates Sep 14, 2023
cc8310b
Merge branch 'master' into filename-timezone
kates Oct 5, 2023
2ab18ea
move tz to offset conversion codes to sink util
kates Oct 6, 2023
52930a5
Merge branch 'master' into filename-timezone
kates Oct 6, 2023
6723546
remove empty line
kates Oct 6, 2023
8a962f4
Merge branch 'master' into filename-timezone
kates Oct 18, 2023
0af758a
update timezone docs in vector-config
kates Oct 18, 2023
a7f38f1
Merge branch 'master' into filename-timezone
kates Oct 23, 2023
6667f6c
get timezone and convert to offset in one go in FileSink
kates Oct 23, 2023
a50d773
Merge branch 'master' into filename-timezone
kates Oct 25, 2023
a6f95d5
just pass the sinkconfig directly
kates Oct 25, 2023
dbc2455
Merge branch 'master' into filename-timezone
kates Oct 26, 2023
f25f6b7
vector_common to vector_lib
kates Oct 26, 2023
53b6fff
Merge branch 'master' into filename-timezone
kates Oct 27, 2023
d6d579d
configurable_component is in vector_lib now
kates Oct 27, 2023
6d64b8e
Merge branch 'master' into filename-timezone
kates Oct 30, 2023
67b4f34
Merge branch 'master' into filename-timezone
kates Nov 6, 2023
0707b0c
lookup to vector_lib
kates Nov 6, 2023
dbf4afb
Merge branch 'master' into filename-timezone
kates Nov 9, 2023
0b59e29
fix aws s3 integration test. pass the context to build_processor in t…
kates Nov 9, 2023
6a6df39
Merge branch 'master' into filename-timezone
kates Nov 10, 2023
25ca178
formatting
kates Nov 10, 2023
c62e059
Merge branch 'master' into filename-timezone
kates Nov 12, 2023
0baabb8
add sinkcontext to FileSink in file tests
kates Nov 12, 2023
274c170
Merge branch 'master' into filename-timezone
kates Nov 14, 2023
a36cf74
key_prefix is expected to be a template. no need for into
kates Nov 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ bollard = { version = "0.15.0", default-features = false, features = ["ssl", "ch
bytes = { version = "1.5.0", default-features = false, features = ["serde"] }
bytesize = { version = "1.3.0", default-features = false }
chrono = { version = "0.4.31", default-features = false, features = ["serde"] }
chrono-tz = { version = "0.8.3", default-features = false }
cidr-utils = { version = "0.5.11", default-features = false }
clap = { version = "4.4.8", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] }
colored = { version = "2.0.4", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-config/src/external/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ impl Configurable for TimeZone {

fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_title("Timezone reference.");
metadata.set_description(r#"This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone.
metadata.set_title("Timezone to use for any date specifiers in template strings.");
metadata.set_description(r#"This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. It will default to the [globally configured timezone](https://vector.dev/docs/reference/configuration/global-options/#timezone).

[tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones"#);
metadata.add_custom_attribute(CustomAttribute::kv(
Expand Down
31 changes: 24 additions & 7 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::convert::TryInto;

use aws_sdk_s3::Client as S3Client;
use tower::ServiceBuilder;
use vector_lib::codecs::{
Expand All @@ -8,6 +6,7 @@ use vector_lib::codecs::{
};
use vector_lib::configurable::configurable_component;
use vector_lib::sink::VectorSink;
use vector_lib::TimeZone;

use super::sink::S3RequestOptions;
use crate::{
Expand All @@ -23,8 +22,8 @@ use crate::{
sink::S3Sink,
},
util::{
BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
TowerRequestConfig,
timezone_to_offset, BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression,
ServiceBuilderExt, TowerRequestConfig,
},
Healthcheck,
},
Expand Down Expand Up @@ -136,6 +135,10 @@ pub struct S3SinkConfig {
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub acknowledgements: AcknowledgementsConfig,

#[configurable(derived)]
#[serde(default)]
pub timezone: Option<TimeZone>,
}

pub(super) fn default_key_prefix() -> String {
Expand Down Expand Up @@ -163,6 +166,7 @@ impl GenerateConfig for S3SinkConfig {
tls: Some(TlsConfig::default()),
auth: AwsAuthentication::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
})
.unwrap()
}
Expand All @@ -174,7 +178,7 @@ impl SinkConfig for S3SinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let service = self.create_service(&cx.proxy).await?;
let healthcheck = self.build_healthcheck(service.client())?;
let sink = self.build_processor(service)?;
let sink = self.build_processor(service, cx)?;
Ok((sink, healthcheck))
}

Expand All @@ -188,7 +192,11 @@ impl SinkConfig for S3SinkConfig {
}

impl S3SinkConfig {
pub fn build_processor(&self, service: S3Service) -> crate::Result<VectorSink> {
pub fn build_processor(
&self,
service: S3Service,
cx: SinkContext,
) -> crate::Result<VectorSink> {
// Build our S3 client/service, which is what we'll ultimately feed
// requests into in order to ship files to S3. We build this here in
// order to configure the client/service with retries, concurrency
Expand All @@ -198,16 +206,24 @@ impl S3SinkConfig {
.settings(request_limits, S3RetryLogic)
.service(service);

let offset = self
.timezone
.or(cx.globals.timezone)
.and_then(timezone_to_offset);

// Configure our partitioning/batching.
let batch_settings = self.batch.into_batcher_settings()?;
let key_prefix = self.key_prefix.clone().try_into()?;

let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);

let ssekms_key_id = self
.options
.ssekms_key_id
.as_ref()
.cloned()
.map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
.transpose()?;

let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id);

let transformer = self.encoding.transformer();
Expand All @@ -222,6 +238,7 @@ impl S3SinkConfig {
filename_append_uuid: self.filename_append_uuid,
encoder: (transformer, encoder),
compression: self.compression,
filename_tz_offset: offset,
};

let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
Expand Down
20 changes: 11 additions & 9 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn s3_insert_message_into_with_flat_key_prefix() {
config.key_prefix = "test-prefix".to_string();
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, 10);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -95,7 +95,7 @@ async fn s3_insert_message_into_with_folder_key_prefix() {
config.key_prefix = "test-prefix/".to_string();
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, 10);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -132,7 +132,7 @@ async fn s3_insert_message_into_with_ssekms_key_id() {
config.options.ssekms_key_id = Some("alias/aws/s3".to_string());

let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, 10);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn s3_rotate_files_after_the_buffer_size_is_reached() {
};
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, _events) = random_lines_with_stream(100, 30, None);

Expand Down Expand Up @@ -229,7 +229,7 @@ async fn s3_gzip() {

let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, batch_size * batch_multiplier);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -274,7 +274,7 @@ async fn s3_zstd() {

let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, batch_size * batch_multiplier);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -336,7 +336,7 @@ async fn s3_insert_message_into_object_lock() {
let config = config(&bucket, 1000000);
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, events, receiver) = make_events_batch(100, 10);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
Expand Down Expand Up @@ -368,7 +368,7 @@ async fn acknowledges_failures() {
config.bucket = format!("BREAK{}IT", config.bucket);
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (_lines, events, receiver) = make_events_batch(1, 1);
run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
Expand Down Expand Up @@ -434,11 +434,12 @@ async fn s3_flush_on_exhaustion() {
tls: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
}
};
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();
let sink = config.build_processor(service, cx).unwrap();

let (lines, _events) = random_lines_with_stream(100, 2, None); // only generate two events (less than batch size)

Expand Down Expand Up @@ -517,6 +518,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig {
tls: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ mod sink;

mod integration_tests;

pub use self::config::S3SinkConfig;
pub use config::S3SinkConfig;
12 changes: 10 additions & 2 deletions src/sinks/aws_s3/sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;

use bytes::Bytes;
use chrono::Utc;
use chrono::{FixedOffset, Utc};
use uuid::Uuid;
use vector_lib::codecs::encoding::Framer;
use vector_lib::event::Finalizable;
Expand Down Expand Up @@ -32,6 +32,7 @@ pub struct S3RequestOptions {
pub api_options: S3Options,
pub encoder: (Transformer, Encoder<Framer>),
pub compression: Compression,
pub filename_tz_offset: Option<FixedOffset>,
}

impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
Expand Down Expand Up @@ -76,7 +77,14 @@ impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
let filename = {
let formatted_ts = Utc::now().format(self.filename_time_format.as_str());
let formatted_ts = match self.filename_tz_offset {
Some(offset) => Utc::now()
.with_timezone(&offset)
.format(self.filename_time_format.as_str()),
None => Utc::now()
.with_timezone(&chrono::Utc)
.format(self.filename_time_format.as_str()),
};

self.filename_append_uuid
.then(|| format!("{}-{}", formatted_ts, Uuid::new_v4().hyphenated()))
Expand Down
32 changes: 24 additions & 8 deletions src/sinks/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use vector_lib::codecs::{
use vector_lib::configurable::configurable_component;
use vector_lib::{
internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered},
EstimatedJsonEncodedSizeOf,
EstimatedJsonEncodedSizeOf, TimeZone,
};

use crate::{
Expand All @@ -33,9 +33,10 @@ use crate::{
internal_events::{
FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError,
},
sinks::util::StreamSink,
sinks::util::{timezone_to_offset, StreamSink},
template::Template,
};

mod bytes_path;

use bytes_path::BytesPath;
Expand Down Expand Up @@ -84,6 +85,10 @@ pub struct FileSinkConfig {
)]
pub acknowledgements: AcknowledgementsConfig,

#[configurable(derived)]
#[serde(default)]
pub timezone: Option<TimeZone>,

#[configurable(derived)]
#[serde(default)]
internal_metrics: FileInternalMetricsConfig,
Expand All @@ -97,6 +102,7 @@ impl GenerateConfig for FileSinkConfig {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
})
.unwrap()
Expand Down Expand Up @@ -181,9 +187,9 @@ impl OutFile {
impl SinkConfig for FileSinkConfig {
async fn build(
&self,
_cx: SinkContext,
cx: SinkContext,
) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
let sink = FileSink::new(self)?;
let sink = FileSink::new(self, cx)?;
Ok((
super::VectorSink::from_event_streamsink(sink),
future::ok(()).boxed(),
Expand Down Expand Up @@ -211,13 +217,18 @@ pub struct FileSink {
}

impl FileSink {
pub fn new(config: &FileSinkConfig) -> crate::Result<Self> {
pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);

let offset = config
.timezone
.or(cx.globals.timezone)
.and_then(timezone_to_offset);

Ok(Self {
path: config.path.clone(),
path: config.path.clone().with_tz_offset(offset),
transformer,
encoder,
idle_timeout: config.idle_timeout,
Expand Down Expand Up @@ -464,6 +475,7 @@ mod tests {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
};

Expand All @@ -487,6 +499,7 @@ mod tests {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::Gzip,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
};

Expand All @@ -510,6 +523,7 @@ mod tests {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::Zstd,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
};

Expand Down Expand Up @@ -538,6 +552,7 @@ mod tests {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
};

Expand Down Expand Up @@ -617,6 +632,7 @@ mod tests {
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: Default::default(),
};

Expand All @@ -626,7 +642,7 @@ mod tests {

let sink_handle = tokio::spawn(async move {
assert_sink_compliance(&FILE_SINK_TAGS, async move {
let sink = FileSink::new(&config).unwrap();
let sink = FileSink::new(&config, SinkContext::default()).unwrap();
VectorSink::from_event_streamsink(sink)
.run(Box::pin(rx.map(Into::into)))
.await
Expand Down Expand Up @@ -670,7 +686,7 @@ mod tests {

async fn run_assert_sink(config: FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
assert_sink_compliance(&FILE_SINK_TAGS, async move {
let sink = FileSink::new(&config).unwrap();
let sink = FileSink::new(&config, SinkContext::default()).unwrap();
VectorSink::from_event_streamsink(sink)
.run(Box::pin(stream::iter(events.map(Into::into))))
.await
Expand Down
Loading