From a031a522a4cb7c0e9637ae1067a01bee71cefa12 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Tue, 10 Nov 2020 18:47:59 +0000 Subject: [PATCH 1/6] enhancement(sources): Allow line aggregation to never timeout I broke this off from the AWS S3 source PR as it seemed like it could be reviewed separately. In the context of AWS S3, and probably other sources, I think it is useful to have the line aggregation logic never timeout and aggregate what it has; instead just blocking until it has a full line. This change makes `timeout_ms` on the multiline config optional, and defaults to never timing out. Signed-off-by: Jesse Szwedko --- docs/reference/components/sources.cue | 3 ++- src/line_agg.rs | 25 +++++++++++++------------ src/sources/file.rs | 2 +- src/sources/util/multiline_config.rs | 4 ++-- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/docs/reference/components/sources.cue b/docs/reference/components/sources.cue index 56605a6fd011f..0c79a9b294311 100644 --- a/docs/reference/components/sources.cue +++ b/docs/reference/components/sources.cue @@ -49,7 +49,8 @@ components: sources: [Name=string]: { } timeout_ms: { description: "The maximum time to wait for the continuation. Once this timeout is reached, the buffered message is guaranteed to be flushed, even if incomplete." - required: true + required: false + common: true sort: 4 type: uint: { examples: [1_000, 600_000] diff --git a/src/line_agg.rs b/src/line_agg.rs index 745182d01cede..a1771f8a4cae5 100644 --- a/src/line_agg.rs +++ b/src/line_agg.rs @@ -60,7 +60,7 @@ pub struct Config { /// The maximum time to wait for the continuation. Once this timeout is /// reached, the buffered message is guaranteed to be flushed, even if /// incomplete. - pub timeout: Duration, + pub timeout: Option, } impl Config { @@ -70,7 +70,7 @@ impl Config { let start_pattern = marker; let condition_pattern = start_pattern.clone(); let mode = Mode::HaltBefore; - let timeout = Duration::from_millis(timeout_ms); + let timeout = Some(Duration::from_millis(timeout_ms)); Self { start_pattern, @@ -351,8 +351,9 @@ where if self.config.start_pattern.is_match(line.as_ref()) { // It was indeed a new line we need to filter. // Set the timeout and buffer this line. - self.timeouts - .insert(entry.key().clone(), self.config.timeout); + if let Some(timeout) = self.config.timeout { + self.timeouts.insert(entry.key().clone(), timeout); + } entry.insert(Aggregate::new(line, context)); None } else { @@ -419,7 +420,7 @@ mod tests { start_pattern: Regex::new("^[^\\s]").unwrap(), condition_pattern: Regex::new("^[\\s]+").unwrap(), mode: Mode::ContinueThrough, - timeout: Duration::from_millis(10), + timeout: Some(Duration::from_millis(10)), }; let expected = vec![ "some usual line", @@ -450,7 +451,7 @@ mod tests { start_pattern: Regex::new("\\\\$").unwrap(), condition_pattern: Regex::new("\\\\$").unwrap(), mode: Mode::ContinuePast, - timeout: Duration::from_millis(10), + timeout: Some(Duration::from_millis(10)), }; let expected = vec![ "some usual line", @@ -481,7 +482,7 @@ mod tests { start_pattern: Regex::new("").unwrap(), condition_pattern: Regex::new("^(INFO|ERROR) ").unwrap(), mode: Mode::HaltBefore, - timeout: Duration::from_millis(10), + timeout: Some(Duration::from_millis(10)), }; let expected = vec![ "INFO some usual line", @@ -512,7 +513,7 @@ mod tests { start_pattern: Regex::new("[^;]$").unwrap(), condition_pattern: Regex::new(";$").unwrap(), mode: Mode::HaltWith, - timeout: Duration::from_millis(10), + timeout: Some(Duration::from_millis(10)), }; let expected = vec![ "some usual line;", @@ -538,7 +539,7 @@ mod tests { start_pattern: Regex::new("^[^\\s]").unwrap(), condition_pattern: Regex::new("^[\\s]+at").unwrap(), mode: Mode::ContinueThrough, - timeout: Duration::from_millis(10), + timeout: Some(Duration::from_millis(10)), }; let expected = vec![concat!( "java.lang.Exception\n", @@ -560,7 +561,7 @@ mod tests { start_pattern: Regex::new("^[^\\s]").unwrap(), condition_pattern: Regex::new("^[\\s]+from").unwrap(), mode: Mode::ContinueThrough, - timeout: Duration::from_millis(10), + timeout: Some(Duration::from_millis(10)), }; let expected = vec![concat!( "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n", @@ -594,7 +595,7 @@ mod tests { start_pattern: Regex::new("^\\s").unwrap(), condition_pattern: Regex::new("^\\s").unwrap(), mode: Mode::ContinueThrough, - timeout: Duration::from_millis(10), + timeout: Some(Duration::from_millis(10)), }; let expected = vec![ "not merged 1", @@ -632,7 +633,7 @@ mod tests { start_pattern: Regex::new("").unwrap(), condition_pattern: Regex::new("^START ").unwrap(), mode: Mode::HaltBefore, - timeout: Duration::from_millis(10), + timeout: Some(Duration::from_millis(10)), }; let expected = vec![ "part 0.1\npart 0.2", diff --git a/src/sources/file.rs b/src/sources/file.rs index 13a4730d2c975..a61e7c53bb8eb 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -1186,7 +1186,7 @@ mod tests { start_pattern: "INFO".to_owned(), condition_pattern: "INFO".to_owned(), mode: line_agg::Mode::HaltBefore, - timeout_ms: 25, // less than 50 in sleep() + timeout_ms: Some(25), // less than 50 in sleep() }), ..test_default_file_config(&dir) }; diff --git a/src/sources/util/multiline_config.rs b/src/sources/util/multiline_config.rs index aaa84ec07a55d..e8e57f0948f46 100644 --- a/src/sources/util/multiline_config.rs +++ b/src/sources/util/multiline_config.rs @@ -12,7 +12,7 @@ pub struct MultilineConfig { pub start_pattern: String, pub condition_pattern: String, pub mode: line_agg::Mode, - pub timeout_ms: u64, + pub timeout_ms: Option, } impl TryFrom<&MultilineConfig> for line_agg::Config { @@ -31,7 +31,7 @@ impl TryFrom<&MultilineConfig> for line_agg::Config { let condition_pattern = Regex::new(condition_pattern) .with_context(|| InvalidMultilineConditionPattern { condition_pattern })?; let mode = mode.clone(); - let timeout = Duration::from_millis(*timeout_ms); + let timeout = timeout_ms.map(|t| Duration::from_millis(t)); Ok(Self { start_pattern, From 75c5e21cd8806fa224d61d4eb7ac4f09044b897a Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Tue, 10 Nov 2020 18:57:05 +0000 Subject: [PATCH 2/6] cue fmt Signed-off-by: Jesse Szwedko --- docs/reference/components/sources.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/components/sources.cue b/docs/reference/components/sources.cue index 0c79a9b294311..40545c3fec942 100644 --- a/docs/reference/components/sources.cue +++ b/docs/reference/components/sources.cue @@ -50,7 +50,7 @@ components: sources: [Name=string]: { timeout_ms: { description: "The maximum time to wait for the continuation. Once this timeout is reached, the buffered message is guaranteed to be flushed, even if incomplete." required: false - common: true + common: true sort: 4 type: uint: { examples: [1_000, 600_000] From fc191817c453349604986cf0e3a0a4807f97eddd Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Tue, 10 Nov 2020 19:00:40 +0000 Subject: [PATCH 3/6] clippy Signed-off-by: Jesse Szwedko --- src/sources/docker.rs | 2 +- src/sources/util/multiline_config.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sources/docker.rs b/src/sources/docker.rs index bc9ffd9b79fc9..98c6c3b6b2e7b 100644 --- a/src/sources/docker.rs +++ b/src/sources/docker.rs @@ -1493,7 +1493,7 @@ mod integration_tests { start_pattern: "^[^\\s]".to_owned(), condition_pattern: "^[\\s]+at".to_owned(), mode: line_agg::Mode::ContinueThrough, - timeout_ms: 10, + timeout_ms: Some(10), }), ..DockerConfig::default() }; diff --git a/src/sources/util/multiline_config.rs b/src/sources/util/multiline_config.rs index e8e57f0948f46..421df547386a2 100644 --- a/src/sources/util/multiline_config.rs +++ b/src/sources/util/multiline_config.rs @@ -31,7 +31,7 @@ impl TryFrom<&MultilineConfig> for line_agg::Config { let condition_pattern = Regex::new(condition_pattern) .with_context(|| InvalidMultilineConditionPattern { condition_pattern })?; let mode = mode.clone(); - let timeout = timeout_ms.map(|t| Duration::from_millis(t)); + let timeout = timeout_ms.map(Duration::from_millis); Ok(Self { start_pattern, From 0bebc74d8341a29206c9e0cabed9091f916317e9 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Tue, 10 Nov 2020 21:01:53 +0000 Subject: [PATCH 4/6] cue vet Signed-off-by: Jesse Szwedko --- docs/reference/components/sources.cue | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/reference/components/sources.cue b/docs/reference/components/sources.cue index 40545c3fec942..9583e0f19a323 100644 --- a/docs/reference/components/sources.cue +++ b/docs/reference/components/sources.cue @@ -53,6 +53,7 @@ components: sources: [Name=string]: { common: true sort: 4 type: uint: { + default: null examples: [1_000, 600_000] unit: "milliseconds" } From 39b08aa986a1d9a52e53252ec6bfbf725316a7d1 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Wed, 11 Nov 2020 19:38:02 +0000 Subject: [PATCH 5/6] Default to 1000ms, allow "none" to disable Signed-off-by: Jesse Szwedko --- docs/reference/components/sources.cue | 4 +- src/sources/util/multiline_config.rs | 71 +++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/docs/reference/components/sources.cue b/docs/reference/components/sources.cue index 9583e0f19a323..3f0b140ca9af4 100644 --- a/docs/reference/components/sources.cue +++ b/docs/reference/components/sources.cue @@ -48,12 +48,12 @@ components: sources: [Name=string]: { type: string: examples: ["^[^\\s]", "\\\\$", "^(INFO|ERROR) ", "[^;]$"] } timeout_ms: { - description: "The maximum time to wait for the continuation. Once this timeout is reached, the buffered message is guaranteed to be flushed, even if incomplete." + description: "The maximum time to wait for the continuation. Once this timeout is reached, the buffered message is guaranteed to be flushed, even if incomplete. Set to \"none\" for no timeout." required: false common: true sort: 4 type: uint: { - default: null + default: 1_000 examples: [1_000, 600_000] unit: "milliseconds" } diff --git a/src/sources/util/multiline_config.rs b/src/sources/util/multiline_config.rs index 421df547386a2..9b7312bec4441 100644 --- a/src/sources/util/multiline_config.rs +++ b/src/sources/util/multiline_config.rs @@ -12,9 +12,14 @@ pub struct MultilineConfig { pub start_pattern: String, pub condition_pattern: String, pub mode: line_agg::Mode, + #[serde(default = "default_timeout_ms", with = "optional_u64")] pub timeout_ms: Option, } +const fn default_timeout_ms() -> Option { + Some(1000) +} + impl TryFrom<&MultilineConfig> for line_agg::Config { type Error = Error; @@ -63,3 +68,69 @@ pub enum Error { source: regex::Error, }, } + +mod optional_u64 { + use serde::de::{self, Deserializer, Unexpected, Visitor}; + use serde::ser::Serializer; + + pub(super) fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + struct OptionalU64; + + impl<'de> Visitor<'de> for OptionalU64 { + type Value = Option; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("number or \"none\"") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + match value { + "none" => Ok(None), + s => Err(de::Error::invalid_value( + Unexpected::Str(s), + &"number or \"none\"", + )), + } + } + + fn visit_i64(self, value: i64) -> Result + where + E: de::Error, + { + if value > 0 { + Ok(Some(value as u64)) + } else { + Err(de::Error::invalid_value( + Unexpected::Signed(value), + &"positive integer", + )) + } + } + + fn visit_u64(self, value: u64) -> Result + where + E: de::Error, + { + Ok(Some(value)) + } + } + + deserializer.deserialize_any(OptionalU64) + } + + pub(super) fn serialize(value: &Option, serializer: S) -> Result + where + S: Serializer, + { + match *value { + Some(v) => serializer.serialize_u64(v), + None => serializer.serialize_str("none"), + } + } +} From f39af9333009d3a1ea5a4fdaffd01a237ad6feef Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Fri, 13 Nov 2020 14:53:04 +0000 Subject: [PATCH 6/6] clippy --- src/sources/aws_s3/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 2464e7e9f74e5..a20f32ba9b506 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -315,7 +315,7 @@ mod integration_tests { start_pattern: "abc".to_owned(), mode: line_agg::Mode::HaltWith, condition_pattern: "geh".to_owned(), - timeout_ms: 1000, + timeout_ms: Some(1000), }), logs.join("\n").into_bytes(), vec!["abc\ndef\ngeh".to_owned()],