From 3b562eddb1a89c95054c8703418b93e1e050bd96 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 25 Mar 2020 00:52:12 +0300 Subject: [PATCH 01/14] Generalize merge transform around log event normalizer Signed-off-by: MOZGIII --- src/transforms/merge.rs | 149 ++++++++++++++++++++++++++++++++-------- 1 file changed, 122 insertions(+), 27 deletions(-) diff --git a/src/transforms/merge.rs b/src/transforms/merge.rs index af27528938843..0535b2a3c1f20 100644 --- a/src/transforms/merge.rs +++ b/src/transforms/merge.rs @@ -2,7 +2,7 @@ use super::Transform; use crate::{ event::discriminant::Discriminant, event::merge_state::LogEventMergeState, - event::{self, Event}, + event::{self, Event, LogEvent, Value}, topology::config::{DataType, TransformConfig, TransformContext, TransformDescription}, }; use serde::{Deserialize, Serialize}; @@ -64,27 +64,59 @@ impl TransformConfig for MergeConfig { } #[derive(Debug)] -pub struct Merge { - partial_event_marker_field: Atom, +pub struct Merge +where + N: NormalizeLogEvent, +{ + normalizer: N, merge_fields: Vec, stream_discriminant_fields: Vec, log_event_merge_states: HashMap, } -impl From for Merge { +impl From for Merge { fn from(config: MergeConfig) -> Self { + let MergeConfig { + partial_event_marker_field, + merge_fields, + stream_discriminant_fields, + } = config; + + Self::new( + PartialEventMarkerFieldNormalizer { + partial_event_marker_field, + }, + merge_fields, + stream_discriminant_fields, + ) + } +} + +impl Merge +where + N: NormalizeLogEvent, +{ + /// Create a new [`Merge`] transform with the specified parameters. + pub fn new( + normalizer: N, + merge_fields: Vec, + stream_discriminant_fields: Vec, + ) -> Self { Self { - partial_event_marker_field: config.partial_event_marker_field, - merge_fields: config.merge_fields, - stream_discriminant_fields: config.stream_discriminant_fields, + normalizer, + merge_fields, + stream_discriminant_fields, log_event_merge_states: HashMap::new(), } } } -impl Transform for Merge { +impl Transform for Merge +where + N: NormalizeLogEvent + Send, +{ fn transform(&mut self, event: Event) -> Option { - let mut event = event.into_log(); + let event = event.into_log(); // Prepare an event's discriminant. let discriminant = Discriminant::from_log_event(&event, &self.stream_discriminant_fields); @@ -97,26 +129,32 @@ impl Transform for Merge { // easily, as we expect users to rely on `lua` transform to implement // custom partial markers. - // If current event has the partial marker, consider it partial. - // Remove the partial marker from the event and stash it. - if event.remove(&self.partial_event_marker_field).is_some() { - // We got a perial event. Initialize a partial event merging state - // if there's none available yet, or extend the existing one by - // merging the incoming partial event in. - match self.log_event_merge_states.entry(discriminant) { - hash_map::Entry::Vacant(entry) => { - entry.insert(LogEventMergeState::new(event)); - } - hash_map::Entry::Occupied(mut entry) => { - entry - .get_mut() - .merge_in_next_event(event, &self.merge_fields); + // Normalize the event, and perform partiality detection. Normalization + // should clean up the event from the the partiality markers, if + // applicable (and if it is a sane behaviuous for a particular use + // case). + // If the resulting normalized event is partial - stash it. + let event = match self.normalizer.normalize(event) { + MaybePartialLogEvent::Partial(event) => { + // We got a perial event. Initialize a partial event merging state + // if there's none available yet, or extend the existing one by + // merging the incoming partial event in. + match self.log_event_merge_states.entry(discriminant) { + hash_map::Entry::Vacant(entry) => { + entry.insert(LogEventMergeState::new(event)); + } + hash_map::Entry::Occupied(mut entry) => { + entry + .get_mut() + .merge_in_next_event(event, &self.merge_fields); + } } - } - // Do not emit the event yet. - return None; - } + // Do not emit the event yet. + return None; + } + MaybePartialLogEvent::NonPartial(event) => event, + }; // We got non-partial event. Attempt to get a partial event merge // state. If it's empty then we don't have a backlog of partail events @@ -137,6 +175,63 @@ impl Transform for Merge { } } +/// Represents either a partial or non-partial event. +/// In both cases, the actual underlying event is ready for further processing, +/// in a sense that, if the event is partial and the partial event marker has +/// to be removed according to the [`NormalizeLogEvent`] implementation +/// semantics, the event contained at the [`MaybePartialLogEvent`] already has +/// the partial event marker cleaned up. +#[derive(Debug)] +pub enum MaybePartialLogEvent { + Partial(LogEvent), + NonPartial(LogEvent), +} + +/// Performs normalization of the event for merging purposes. It's also +/// responsible for determining whether the event is partial or non-partial. +/// +/// If the event has a partial marker of any kind - that event is considered +/// partial, otherwise - non-partial. +/// The job of NormalizeLogEvent implementation if to determine whether the +/// event is partial, and if it is - clear the partial event marker from it and +/// return the resulting cleared event as [`MaybePartialLogEvent::Partial`]. +/// Events that are detected as non-partial have to be returned as +/// [`MaybePartialLogEvent::MonPartial`]; negative partial event marker, if any, +/// can optionally be removed from them. +/// +/// It's the implementer's job to determine a sane normalization strategy, since +/// correct semantics varies on a case by case basis. +/// +/// If you have troubles with determining the correct semantics, consider +/// checking existing implementations to see some examples. +pub trait NormalizeLogEvent { + fn normalize(&self, event: LogEvent) -> MaybePartialLogEvent; +} + +/// A [`NormalizeLogEvent`] implementation that performs pariality detection +/// based on the presence of the `partial_event_marker_field`. +/// +/// If the event has `partial_event_marker_field` among it's field, then, +/// regardless of the value +#[derive(Debug, Clone, PartialEq)] +pub struct PartialEventMarkerFieldNormalizer { + pub partial_event_marker_field: Atom, +} + +impl NormalizeLogEvent for PartialEventMarkerFieldNormalizer { + fn normalize(&self, mut event: LogEvent) -> MaybePartialLogEvent { + // If the event has a field with `partial_event_marker_field` key - + // it is a partial event. It is expected that we remove the partial + // event marker - so do both the removal and check efficiently in a + // single operation. + if event.remove(&self.partial_event_marker_field).is_some() { + MaybePartialLogEvent::Partial(event) + } else { + MaybePartialLogEvent::NonPartial(event) + } + } +} + #[cfg(test)] mod test { use super::{Merge, MergeConfig}; From 3c97d45af2b4a3e4a3719b8a3637eae013d38dcc Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 25 Mar 2020 00:53:06 +0300 Subject: [PATCH 02/14] Add TrailingNewlineNormalizer Signed-off-by: MOZGIII --- src/transforms/merge.rs | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/transforms/merge.rs b/src/transforms/merge.rs index 0535b2a3c1f20..964fb12ddebde 100644 --- a/src/transforms/merge.rs +++ b/src/transforms/merge.rs @@ -232,6 +232,39 @@ impl NormalizeLogEvent for PartialEventMarkerFieldNormalizer { } } +/// A [`NormalizeLogEvent`] implementation that performs pariality detection +/// based on the presence of the trailing newline at the `probe_field` of the +/// event. +/// +/// If the event has `probe_field` field, and it's a string value that DOES NOT +/// contain a newline (`\n`) at the end, we consider that event partial. +/// To normalize the event, we remove the trailing newline. +/// If the event +/// - has no `probe_field` field, or +/// - has it, but the values is not a string, or +/// - it is a string that DOES contain a trailing newline +/// we consider the event non-partial. +#[derive(Debug, Clone, PartialEq)] +pub struct TrailingNewlineNormalizer { + pub probe_field: Atom, +} + +impl NormalizeLogEvent for TrailingNewlineNormalizer { + fn normalize(&self, mut event: LogEvent) -> MaybePartialLogEvent { + // See `TrailingNewlineNormalizer` documentation for logic description. + + if let Some(Value::Bytes(s)) = event.get_mut(&self.probe_field) { + if s.ends_with(&[b'\n']) { + s.truncate(s.len() - 1); + } else { + return MaybePartialLogEvent::Partial(event); + } + } + + MaybePartialLogEvent::NonPartial(event) + } +} + #[cfg(test)] mod test { use super::{Merge, MergeConfig}; From 5b04af29ef3059bae9223e655667f1b161f34d9d Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 25 Mar 2020 02:25:44 +0300 Subject: [PATCH 03/14] Move src/transforms/merge.rs to src/transforms/merge/mod.rs Signed-off-by: MOZGIII --- src/transforms/{merge.rs => merge/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/transforms/{merge.rs => merge/mod.rs} (100%) diff --git a/src/transforms/merge.rs b/src/transforms/merge/mod.rs similarity index 100% rename from src/transforms/merge.rs rename to src/transforms/merge/mod.rs From 960be0a53ae36e868c6594bd633702e7ba854765 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 25 Mar 2020 02:29:12 +0300 Subject: [PATCH 04/14] Split normalizers into a normalizer mod Signed-off-by: MOZGIII --- src/transforms/merge/mod.rs | 60 ++---------------------------- src/transforms/merge/normalizer.rs | 60 ++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 57 deletions(-) create mode 100644 src/transforms/merge/normalizer.rs diff --git a/src/transforms/merge/mod.rs b/src/transforms/merge/mod.rs index 964fb12ddebde..eb291c2158ecd 100644 --- a/src/transforms/merge/mod.rs +++ b/src/transforms/merge/mod.rs @@ -2,7 +2,7 @@ use super::Transform; use crate::{ event::discriminant::Discriminant, event::merge_state::LogEventMergeState, - event::{self, Event, LogEvent, Value}, + event::{self, Event, LogEvent}, topology::config::{DataType, TransformConfig, TransformContext, TransformDescription}, }; use serde::{Deserialize, Serialize}; @@ -208,62 +208,8 @@ pub trait NormalizeLogEvent { fn normalize(&self, event: LogEvent) -> MaybePartialLogEvent; } -/// A [`NormalizeLogEvent`] implementation that performs pariality detection -/// based on the presence of the `partial_event_marker_field`. -/// -/// If the event has `partial_event_marker_field` among it's field, then, -/// regardless of the value -#[derive(Debug, Clone, PartialEq)] -pub struct PartialEventMarkerFieldNormalizer { - pub partial_event_marker_field: Atom, -} - -impl NormalizeLogEvent for PartialEventMarkerFieldNormalizer { - fn normalize(&self, mut event: LogEvent) -> MaybePartialLogEvent { - // If the event has a field with `partial_event_marker_field` key - - // it is a partial event. It is expected that we remove the partial - // event marker - so do both the removal and check efficiently in a - // single operation. - if event.remove(&self.partial_event_marker_field).is_some() { - MaybePartialLogEvent::Partial(event) - } else { - MaybePartialLogEvent::NonPartial(event) - } - } -} - -/// A [`NormalizeLogEvent`] implementation that performs pariality detection -/// based on the presence of the trailing newline at the `probe_field` of the -/// event. -/// -/// If the event has `probe_field` field, and it's a string value that DOES NOT -/// contain a newline (`\n`) at the end, we consider that event partial. -/// To normalize the event, we remove the trailing newline. -/// If the event -/// - has no `probe_field` field, or -/// - has it, but the values is not a string, or -/// - it is a string that DOES contain a trailing newline -/// we consider the event non-partial. -#[derive(Debug, Clone, PartialEq)] -pub struct TrailingNewlineNormalizer { - pub probe_field: Atom, -} - -impl NormalizeLogEvent for TrailingNewlineNormalizer { - fn normalize(&self, mut event: LogEvent) -> MaybePartialLogEvent { - // See `TrailingNewlineNormalizer` documentation for logic description. - - if let Some(Value::Bytes(s)) = event.get_mut(&self.probe_field) { - if s.ends_with(&[b'\n']) { - s.truncate(s.len() - 1); - } else { - return MaybePartialLogEvent::Partial(event); - } - } - - MaybePartialLogEvent::NonPartial(event) - } -} +mod normalizer; +pub use normalizer::*; #[cfg(test)] mod test { diff --git a/src/transforms/merge/normalizer.rs b/src/transforms/merge/normalizer.rs new file mode 100644 index 0000000000000..c26493f16aab8 --- /dev/null +++ b/src/transforms/merge/normalizer.rs @@ -0,0 +1,60 @@ +use super::{MaybePartialLogEvent, NormalizeLogEvent}; +use crate::event::{LogEvent, Value}; +use string_cache::DefaultAtom as Atom; + +/// A [`NormalizeLogEvent`] implementation that performs pariality detection +/// based on the presence of the `partial_event_marker_field`. +/// +/// If the event has `partial_event_marker_field` among it's field, then, +/// regardless of the value +#[derive(Debug, Clone, PartialEq)] +pub struct PartialEventMarkerFieldNormalizer { + pub partial_event_marker_field: Atom, +} + +impl NormalizeLogEvent for PartialEventMarkerFieldNormalizer { + fn normalize(&self, mut event: LogEvent) -> MaybePartialLogEvent { + // If the event has a field with `partial_event_marker_field` key - + // it is a partial event. It is expected that we remove the partial + // event marker - so do both the removal and check efficiently in a + // single operation. + if event.remove(&self.partial_event_marker_field).is_some() { + MaybePartialLogEvent::Partial(event) + } else { + MaybePartialLogEvent::NonPartial(event) + } + } +} + +/// A [`NormalizeLogEvent`] implementation that performs pariality detection +/// based on the presence of the trailing newline at the `probe_field` of the +/// event. +/// +/// If the event has `probe_field` field, and it's a string value that DOES NOT +/// contain a newline (`\n`) at the end, we consider that event partial. +/// To normalize the event, we remove the trailing newline. +/// If the event +/// - has no `probe_field` field, or +/// - has it, but the values is not a string, or +/// - it is a string that DOES contain a trailing newline +/// we consider the event non-partial. +#[derive(Debug, Clone, PartialEq)] +pub struct TrailingNewlineNormalizer { + pub probe_field: Atom, +} + +impl NormalizeLogEvent for TrailingNewlineNormalizer { + fn normalize(&self, mut event: LogEvent) -> MaybePartialLogEvent { + // See `TrailingNewlineNormalizer` documentation for logic description. + + if let Some(Value::Bytes(s)) = event.get_mut(&self.probe_field) { + if s.ends_with(&[b'\n']) { + s.truncate(s.len() - 1); + } else { + return MaybePartialLogEvent::Partial(event); + } + } + + MaybePartialLogEvent::NonPartial(event) + } +} From 1ae30372fe82fb1fc89cc6a4c2445ce8a80fe58c Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 25 Mar 2020 02:29:48 +0300 Subject: [PATCH 05/14] Add normalizer tests Signed-off-by: MOZGIII --- src/transforms/merge/mod.rs | 2 +- src/transforms/merge/normalizer.rs | 121 +++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/src/transforms/merge/mod.rs b/src/transforms/merge/mod.rs index eb291c2158ecd..788f8c49a654b 100644 --- a/src/transforms/merge/mod.rs +++ b/src/transforms/merge/mod.rs @@ -181,7 +181,7 @@ where /// to be removed according to the [`NormalizeLogEvent`] implementation /// semantics, the event contained at the [`MaybePartialLogEvent`] already has /// the partial event marker cleaned up. -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub enum MaybePartialLogEvent { Partial(LogEvent), NonPartial(LogEvent), diff --git a/src/transforms/merge/normalizer.rs b/src/transforms/merge/normalizer.rs index c26493f16aab8..8118c88c39d36 100644 --- a/src/transforms/merge/normalizer.rs +++ b/src/transforms/merge/normalizer.rs @@ -58,3 +58,124 @@ impl NormalizeLogEvent for TrailingNewlineNormalizer { MaybePartialLogEvent::NonPartial(event) } } + +#[cfg(test)] +mod tests { + use super::{ + MaybePartialLogEvent, NormalizeLogEvent, PartialEventMarkerFieldNormalizer, + TrailingNewlineNormalizer, + }; + use crate::event::Event; + + #[test] + fn partial_event_marker_field_normalizer_non_partial() { + let partial_event_marker_field = "_partial"; + + let normalizer = PartialEventMarkerFieldNormalizer { + partial_event_marker_field: partial_event_marker_field.into(), + }; + + let sample_event = { + let mut event = Event::new_empty_log().into_log(); + event.insert("a", "qwe"); + event.insert("b", 1); + // no partial event marker field - non-partial event + event + }; + + let expected_event = { + let mut event = Event::new_empty_log().into_log(); + event.insert("a", "qwe"); + event.insert("b", 1); + // no partial event marker field - non-partial event + event + }; + + assert_eq!( + normalizer.normalize(sample_event), + MaybePartialLogEvent::NonPartial(expected_event) + ); + } + + #[test] + fn partial_event_marker_field_normalizer_partial() { + let partial_event_marker_field = "_partial"; + + let normalizer = PartialEventMarkerFieldNormalizer { + partial_event_marker_field: partial_event_marker_field.into(), + }; + + let sample_event = { + let mut event = Event::new_empty_log().into_log(); + event.insert("a", "qwe"); + event.insert("b", 1); + event.insert(partial_event_marker_field, true); // partial + event + }; + + let expected_event = { + let mut event = Event::new_empty_log().into_log(); + event.insert("a", "qwe"); + event.insert("b", 1); + // doesn't have `partial_event_marker_field` anymore - normailized + event + }; + + assert_eq!( + normalizer.normalize(sample_event), + MaybePartialLogEvent::Partial(expected_event) + ); + } + + #[test] + fn trailing_newline_normalizer_non_partial() { + let message_field = "message"; + + let normalizer = TrailingNewlineNormalizer { + probe_field: message_field.into(), + }; + + let sample_event = { + let mut event = Event::new_empty_log().into_log(); + event.insert(message_field, "hello world!\n"); // has trailing newline - non-partial + event + }; + + let expected_event = { + let mut event = Event::new_empty_log().into_log(); + event.insert(message_field, "hello world!"); // normalized - doesn't have a trailing newline + event + }; + + assert_eq!( + normalizer.normalize(sample_event), + MaybePartialLogEvent::NonPartial(expected_event) + ); + } + + #[test] + fn trailing_newline_normalizer_partial() { + let message_field = "message"; + + let normalizer = TrailingNewlineNormalizer { + probe_field: message_field.into(), + }; + + let sample_event = { + let mut event = Event::new_empty_log().into_log(); + event.insert(message_field, "hello "); // ..." world!\n" - partial message + event + }; + + let expected_event = { + let mut event = Event::new_empty_log().into_log(); + event.insert(message_field, "hello "); // partial! + event + }; + + assert_eq!( + normalizer.normalize(sample_event), + MaybePartialLogEvent::Partial(expected_event) + ); + } +} From f7a6ec35890e77b00a05f678fece495a7044a71e Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 25 Mar 2020 01:53:31 +0300 Subject: [PATCH 06/14] Add support for partial events merging at kubernetes source Signed-off-by: MOZGIII --- src/sources/kubernetes/mod.rs | 98 ++++++++++++++++++++++++++++++----- 1 file changed, 85 insertions(+), 13 deletions(-) diff --git a/src/sources/kubernetes/mod.rs b/src/sources/kubernetes/mod.rs index 9d3866ea7a1d8..28b5d9973ccd5 100644 --- a/src/sources/kubernetes/mod.rs +++ b/src/sources/kubernetes/mod.rs @@ -12,6 +12,7 @@ use crate::{ sources::Source, topology::config::{DataType, GlobalOptions, SourceConfig}, transforms::{ + merge::{Merge, TrailingNewlineNormalizer}, regex_parser::{RegexParser, RegexParserConfig}, Transform, }, @@ -71,13 +72,14 @@ impl SourceConfig for KubernetesConfig { let mut transform_file = transform_file()?; let mut transform_pod_uid = transform_pod_uid()?; let mut parse_message = message_parser::build_message_parser()?; + let mut transform_merge_partial_events = transform_merge_partial_events(); // Kubernetes source let source = file_recv .filter_map(move |event| transform_file.transform(event)) .filter_map(move |event| parse_message.transform(event)) .filter_map(move |event| now.filter(event)) - .map(remove_ending_newline) + .filter_map(move |event| transform_merge_partial_events.transform(event)) .filter_map(move |event| transform_pod_uid.transform(event)) .forward(out.sink_map_err(drop)) .map(drop) @@ -120,18 +122,6 @@ impl TimeFilter { } } -fn remove_ending_newline(mut event: Event) -> Event { - if let Some(Value::Bytes(msg)) = event - .as_mut_log() - .get_mut(&event::log_schema().message_key()) - { - if msg.ends_with(&['\n' as u8]) { - msg.truncate(msg.len() - 1); - } - } - event -} - fn transform_file() -> crate::Result> { let mut config = RegexParserConfig::default(); @@ -210,6 +200,17 @@ fn transform_pod_uid() -> crate::Result { Ok(ApplicableTransform::Candidates(transforms)) } +fn transform_merge_partial_events() -> Merge { + let message_key = event::log_schema().message_key(); + Merge::new( + TrailingNewlineNormalizer { + probe_field: message_key.clone(), + }, + vec![message_key.clone()], + vec!["pod_uid".into()], + ) +} + #[cfg(test)] mod tests { use super::*; @@ -271,4 +272,75 @@ mod tests { has(&event, "object_uid", "306cd636-0c6d-11ea-9079-1c1b0de4d755"); } + + #[test] + fn partial_events_merge() { + let message_key = event::log_schema().message_key(); + let sample_pod_uid = "qwerty"; + + let part1 = { + let mut event = Event::new_empty_log(); + event + .as_mut_log() + .insert("pod_uid", sample_pod_uid.to_owned()); + event.as_mut_log().insert(message_key, "hello".to_owned()); + event + }; + let part2 = { + let mut event = Event::new_empty_log(); + event + .as_mut_log() + .insert("pod_uid", sample_pod_uid.to_owned()); + event + .as_mut_log() + .insert(message_key, " world!\n".to_owned()); + event + }; + + let mut transform = transform_merge_partial_events(); + + assert!(transform.transform(part1).is_none()); + let event = transform.transform(part2).unwrap(); + + has(&event, "pod_uid", sample_pod_uid); + has(&event, message_key, "hello world!"); + } + + #[test] + fn partial_events_merge_separate_streams() { + let message_key = event::log_schema().message_key(); + + let part1 = { + let mut event = Event::new_empty_log(); + event.as_mut_log().insert("pod_uid", "qwerty1".to_owned()); + event.as_mut_log().insert(message_key, "par".to_owned()); + event + }; + let part2 = { + let mut event = Event::new_empty_log(); + event.as_mut_log().insert("pod_uid", "qwerty2".to_owned()); + event + .as_mut_log() + .insert(message_key, "non-partial!\n".to_owned()); + event + }; + let part3 = { + let mut event = Event::new_empty_log(); + event.as_mut_log().insert("pod_uid", "qwerty1".to_owned()); + event.as_mut_log().insert(message_key, "tial!\n".to_owned()); + event + }; + + let mut transform = transform_merge_partial_events(); + + assert!(transform.transform(part1).is_none()); + + let event = transform.transform(part2).unwrap(); + has(&event, "pod_uid", "qwerty2"); + has(&event, message_key, "non-partial!"); + + let event = transform.transform(part3).unwrap(); + has(&event, "pod_uid", "qwerty1"); + has(&event, message_key, "partial!"); + } } From bed4c6e97a34c57f88ac5c2ab78b1b0dfd6a40b5 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 25 Mar 2020 02:09:45 +0300 Subject: [PATCH 07/14] Add an integration test for partial message merging at kubernetes Signed-off-by: MOZGIII --- src/sources/kubernetes/test.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/sources/kubernetes/test.rs b/src/sources/kubernetes/test.rs index 35d140ec90ed9..a8b58b6dd265c 100644 --- a/src/sources/kubernetes/test.rs +++ b/src/sources/kubernetes/test.rs @@ -641,3 +641,33 @@ fn kube_diff_pod_uid() { false }); } + +#[test] +fn kube_partial() { + let namespace = format!("partial-{}", Uuid::new_v4()); + let message = random_string(64 * 1024); // 64 kb + let user_namespace = user_namespace(&namespace); + + let kube = Kube::new(&namespace); + let user = Kube::new(&user_namespace); + + // Start vector + let vector = start_vector(&kube, user_namespace.as_str(), None); + + // Start echo + let _echo = echo(&user, "echo", &message); + // Verify logs + wait_for(|| { + // If any daemon logged message, done. + for line in logs(&kube, &vector) { + if line["message"].as_str().unwrap() == message { + // Very long message arrived as a single (merged) message. + // DONE + return true; + } else { + debug!(namespace=%namespace,log=%line); + } + } + false + }); +} From dd009a834e831ae69556e59a23e02d803fa179f6 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 25 Mar 2020 02:49:31 +0300 Subject: [PATCH 08/14] Add the ability to opt-out from automatic partial events merging Signed-off-by: MOZGIII --- src/sources/kubernetes/mod.rs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/sources/kubernetes/mod.rs b/src/sources/kubernetes/mod.rs index 28b5d9973ccd5..317bc883608cd 100644 --- a/src/sources/kubernetes/mod.rs +++ b/src/sources/kubernetes/mod.rs @@ -40,12 +40,15 @@ enum BuildError { IllegalCharacterInUid { uid: String }, } -#[derive(Deserialize, Serialize, Debug, Clone, Default)] +#[derive(Deserialize, Serialize, Debug, Clone, Derivative)] +#[derivative(Default)] #[serde(deny_unknown_fields, default)] pub struct KubernetesConfig { include_container_names: Vec, include_pod_uids: Vec, include_namespaces: Vec, + #[derivative(Default(value = "false"))] + auto_partial_merge: bool, } #[typetag::serde(name = "kubernetes")] @@ -72,15 +75,25 @@ impl SourceConfig for KubernetesConfig { let mut transform_file = transform_file()?; let mut transform_pod_uid = transform_pod_uid()?; let mut parse_message = message_parser::build_message_parser()?; - let mut transform_merge_partial_events = transform_merge_partial_events(); // Kubernetes source - let source = file_recv + let stream = file_recv .filter_map(move |event| transform_file.transform(event)) .filter_map(move |event| parse_message.transform(event)) - .filter_map(move |event| now.filter(event)) - .filter_map(move |event| transform_merge_partial_events.transform(event)) - .filter_map(move |event| transform_pod_uid.transform(event)) + .filter_map(move |event| now.filter(event)); + + let stream: Box + Send> = if self.auto_partial_merge { + let mut transform_merge_partial_events = transform_merge_partial_events(); + Box::new( + stream.filter_map(move |event| transform_merge_partial_events.transform(event)), + ) + } else { + Box::new(stream) + }; + + let stream = stream.filter_map(move |event| transform_pod_uid.transform(event)); + + let source = stream .forward(out.sink_map_err(drop)) .map(drop) .join(file_source) From 24fcf58973c4d37772847ff2b77af8f5d8aaf096 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 25 Mar 2020 02:51:46 +0300 Subject: [PATCH 09/14] Use defaults at sources::kubernetes::file_source_builder::tests::all_filters Signed-off-by: MOZGIII --- src/sources/kubernetes/file_source_builder.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sources/kubernetes/file_source_builder.rs b/src/sources/kubernetes/file_source_builder.rs index 09888f7e79963..ea5c3a3f9ab0b 100644 --- a/src/sources/kubernetes/file_source_builder.rs +++ b/src/sources/kubernetes/file_source_builder.rs @@ -537,6 +537,7 @@ mod tests { include_pod_uids: vec!["a027f09d8f18234519fa930f8fa71234".to_owned()], include_container_names: vec!["busybox".to_owned()], include_namespaces: vec!["telemetry".to_owned(), "app".to_owned()], + ..KubernetesConfig::default() }; assert_eq!( From f22d088230024cce7493f0ac9258baadfd76b4ff Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Thu, 26 Mar 2020 23:17:25 +0300 Subject: [PATCH 10/14] Make the partial test easier for human to check Signed-off-by: MOZGIII --- src/sources/kubernetes/test.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/sources/kubernetes/test.rs b/src/sources/kubernetes/test.rs index a8b58b6dd265c..af9f7070869c9 100644 --- a/src/sources/kubernetes/test.rs +++ b/src/sources/kubernetes/test.rs @@ -15,6 +15,7 @@ use kube::{ }; use serde::de::DeserializeOwned; use serde_json::Value; +use std::fmt::Write; use uuid::Uuid; static NAMESPACE_MARKER: &'static str = "$(TEST_NAMESPACE)"; @@ -645,7 +646,14 @@ fn kube_diff_pod_uid() { #[test] fn kube_partial() { let namespace = format!("partial-{}", Uuid::new_v4()); - let message = random_string(64 * 1024); // 64 kb + let message = { + let mut s = String::new(); + for i in 0..8 { + write!(s, "{}", i.to_string().repeat(8 * 1024)).unwrap(); + } + s + }; + assert_eq!(message.len(), 64 * 1024); // 64 kb let user_namespace = user_namespace(&namespace); let kube = Kube::new(&namespace); From 0b79d1c0faf42c3246ea46769b2f602afbfec1a7 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Thu, 26 Mar 2020 23:39:17 +0300 Subject: [PATCH 11/14] Add special test case for bare newline Signed-off-by: MOZGIII --- src/transforms/merge/normalizer.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/transforms/merge/normalizer.rs b/src/transforms/merge/normalizer.rs index 8118c88c39d36..cf3030a445a7a 100644 --- a/src/transforms/merge/normalizer.rs +++ b/src/transforms/merge/normalizer.rs @@ -178,4 +178,30 @@ mod tests { MaybePartialLogEvent::Partial(expected_event) ); } + + #[test] + fn trailing_newline_normalizer_special_case_bare_newline() { + let message_field = "message"; + + let normalizer = TrailingNewlineNormalizer { + probe_field: message_field.into(), + }; + + let sample_event = { + let mut event = Event::new_empty_log().into_log(); + event.insert(message_field, "\n"); + event + }; + + let expected_event = { + let mut event = Event::new_empty_log().into_log(); + event.insert(message_field, ""); + event + }; + + assert_eq!( + normalizer.normalize(sample_event), + MaybePartialLogEvent::NonPartial(expected_event) + ); + } } From 89e279941bf033116679b55a0b103e1ecc4645da Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Thu, 26 Mar 2020 23:49:46 +0300 Subject: [PATCH 12/14] Fix feature flags Signed-off-by: MOZGIII --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8fd4698d78cc8..31263de86b4f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -228,7 +228,7 @@ sources-docker = ["shiplift"] sources-file = ["bytesize"] sources-journald = [] sources-kafka = ["owning_ref"] -sources-kubernetes = ["sources-file", "transforms-json_parser", "transforms-regex_parser"] +sources-kubernetes = ["sources-file", "transforms-json_parser", "transforms-regex_parser", "transforms-merge"] sources-logplex = ["warp", "sources-tls"] sources-prometheus = [] sources-http = ["warp", "sources-tls"] From bd206e1a846658218c252b9a83ae14caf6df6496 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Thu, 26 Mar 2020 23:58:23 +0300 Subject: [PATCH 13/14] Add documentation Signed-off-by: MOZGIII --- .meta/sources/kubernetes.toml.bak | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.meta/sources/kubernetes.toml.bak b/.meta/sources/kubernetes.toml.bak index 468f075699402..79b5a60237736 100644 --- a/.meta/sources/kubernetes.toml.bak +++ b/.meta/sources/kubernetes.toml.bak @@ -38,6 +38,14 @@ of `kube-system`. Which is by default excluded, \ unless there are any non empty `include` options.\ """ +[sources.kubernetes.options.auto_partial_merge] +type = "bool" +common = true +default = true +description = """\ +Setting this to `false` will disable the automatic merging of partial events.\ +""" + [sources.kubernetes.output.log.fields.container_name] type = "string" examples = ["vector"] From f3e4b2e5f26383450bb80be10b1359e626c48efc Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 31 Mar 2020 08:07:20 +0300 Subject: [PATCH 14/14] Correct the default value for auto_partial_merge Signed-off-by: MOZGIII --- src/sources/kubernetes/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/kubernetes/mod.rs b/src/sources/kubernetes/mod.rs index 317bc883608cd..fb5562494605d 100644 --- a/src/sources/kubernetes/mod.rs +++ b/src/sources/kubernetes/mod.rs @@ -47,7 +47,7 @@ pub struct KubernetesConfig { include_container_names: Vec, include_pod_uids: Vec, include_namespaces: Vec, - #[derivative(Default(value = "false"))] + #[derivative(Default(value = "true"))] auto_partial_merge: bool, }