Skip to content

Commit

Permalink
Merge pull request vectordotdev#151 from answerbook/darinspivey/LOG-1…
Browse files Browse the repository at this point in the history
…5589

fix(transforms): `mezmo_reduce` needs to handle special-case fields
  • Loading branch information
darinspivey authored Jan 3, 2023
2 parents 77e9145 + a440485 commit c27ab24
Showing 1 changed file with 65 additions and 3 deletions.
68 changes: 65 additions & 3 deletions src/transforms/reduce/mezmo_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use chrono::{TimeZone, Utc};
use futures::{stream, Stream, StreamExt};
use indexmap::IndexMap;
use lookup::lookup_v2::parse_target_path;
use lookup::owned_value_path;
use lookup::PathPrefix;
use serde_with::serde_as;
use vector_config::configurable_component;
Expand Down Expand Up @@ -417,9 +418,10 @@ impl ReduceState {

for (k, v) in self.fields.drain() {
// When the resulting event is created from the mezmo-reduce accumulator,
// we need to inject its results into the `.message` property
let key = format!("message.{}", k);
if let Err(error) = v.insert_into(key, &mut event) {
// we need to inject its results into the `.message` property, but make it an
// actual "path" so that special characters are handled.
let path = owned_value_path!("message", k.as_str());
if let Err(error) = v.insert_into(path.to_string(), &mut event) {
warn!(message = "Failed to merge values for field.", %error);
}
}
Expand Down Expand Up @@ -1342,4 +1344,64 @@ mod test {
assert_eq!(output_1["message.epoch_str"], "1671134262".into());
assert_eq!(output_1["message.epoch_str_end"], "1671134264".into());
}

#[tokio::test]
async fn mezmo_reduce_merge_strategies_with_special_paths() {
let reduce = toml::from_str::<MezmoReduceConfig>(
r#"
[merge_strategies]
"some-retain-field" = "retain"
"some!array-field" = "array"
"concat-me!" = "concat"
"#,
)
.unwrap()
.build(&TransformContext::default())
.await
.unwrap();
let reduce = reduce.into_task();

let mut e_1 = LogEvent::default();
e_1.insert(
"message",
BTreeMap::from([
("some-retain-field".to_owned(), "one".into()),
("some!array-field".to_owned(), "four".into()),
("concat-me!".to_owned(), "seven".into()),
]),
);
let mut e_2 = LogEvent::default();
e_2.insert(
"message",
BTreeMap::from([
("some-retain-field".to_owned(), "two".into()),
("some!array-field".to_owned(), "five".into()),
("concat-me!".to_owned(), "eight".into()),
]),
);
let mut e_3 = LogEvent::default();
e_3.insert(
"message",
BTreeMap::from([
("some-retain-field".to_owned(), "three".into()),
("some!array-field".to_owned(), "six".into()),
("concat-me!".to_owned(), "nine".into()),
]),
);

let inputs = vec![e_1.into(), e_2.into(), e_3.into()];
let in_stream = Box::pin(stream::iter(inputs));
let mut out_stream = reduce.transform_events(in_stream);

let output_1 = out_stream.next().await.unwrap().into_log();
assert_eq!(output_1["message.\"some-retain-field\""], "three".into());
assert_eq!(
output_1["message.\"some!array-field\""],
Value::Array(vec!["four".into(), "five".into(), "six".into()])
);
assert_eq!(
output_1["message.\"concat-me!\""],
"seven eight nine".into()
);
}
}

0 comments on commit c27ab24

Please sign in to comment.