Skip to content

Commit

Permalink
Support multiple SD in RFC5424 (fix issue #61) (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
vche authored Nov 23, 2021
1 parent ea0db2f commit a1d4738
Show file tree
Hide file tree
Showing 14 changed files with 407 additions and 82 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
edition = "2018"
name = "flowgger"
version = "0.2.11"
version = "0.2.12"
authors = ["Frank Denis <[email protected]>", "Matteo Bigoi <[email protected]>", "Vivien Chene <[email protected]>", "Francesco Berni <[email protected]>"]
build = "build.rs"
repository = "https://github.com/awslabs/flowgger"
Expand Down
7 changes: 4 additions & 3 deletions src/flowgger/decoder/gelf_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Decoder for GelfDecoder {
appname: None,
procid: None,
msgid: None,
sd: if sd.pairs.is_empty() { None } else { Some(sd) },
sd: if sd.pairs.is_empty() { None } else { Some(vec![sd]) },
msg,
full_msg,
};
Expand All @@ -136,8 +136,9 @@ mod test {
assert!(res.full_msg.unwrap() == "Backtrace here\n\nmore stuff");
assert!(res.severity.unwrap() == 1);

let sd = res.sd.unwrap();
let pairs = sd.pairs;
let sd = &res.sd.unwrap();
assert!(sd.len() == 1);
let pairs = &sd[0].pairs;
assert!(pairs
.iter()
.cloned()
Expand Down
25 changes: 14 additions & 11 deletions src/flowgger/decoder/ltsv_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ impl LTSVDecoder {
"f64" => SDValueType::F64,
"i64" => SDValueType::I64,
"u64" => SDValueType::U64,
_ => panic!(format!(
_ => panic!(
"Unsupported type in input.ltsv_schema for name [{}]",
name
)),
),
};
schema.insert(name.to_owned(), sdtype);
}
Expand Down Expand Up @@ -73,10 +73,10 @@ impl LTSVDecoder {
"f64" => suffixes.s_f64 = Some(suffix),
"i64" => suffixes.s_i64 = Some(suffix),
"u64" => suffixes.s_u64 = Some(suffix),
_ => panic!(format!(
_ => panic!(
"Unsupported type in input.ltsv_suffixes for type [{}]",
sdtype
)),
),
}
}
}
Expand Down Expand Up @@ -211,7 +211,7 @@ impl Decoder for LTSVDecoder {
appname: None,
procid: None,
msgid: None,
sd: if sd.pairs.is_empty() { None } else { Some(sd) },
sd: if sd.pairs.is_empty() { None } else { Some(vec![sd]) },
msg,
full_msg: Some(line.to_owned()),
};
Expand Down Expand Up @@ -259,8 +259,8 @@ fn test_ltsv_suffixes() {
-0700]\tdone:true\tscore:-1\tmean:0.42\tcounter:42\tlevel:3\thost:\
testhostname\tname1:value1\tname 2: value 2\tn3:v3\tmessage:this is a test";
let res = ltsv_decoder.decode(msg).unwrap();
let sd = res.sd.unwrap();
let pairs = sd.pairs;
let sd = &res.sd.unwrap()[0];
let pairs = &sd.pairs;
assert!(pairs
.iter()
.cloned()
Expand Down Expand Up @@ -309,8 +309,8 @@ fn test_ltsv_suffixes_2() {
-0700]\tdone_bool:true\tscore_i64:-1\tmean_f64:0.42\tcounter_u64:42\tlevel:3\thost:\
testhostname\tname1:value1\tname 2: value 2\tn3:v3\tmessage:this is a test";
let res = ltsv_decoder.decode(msg).unwrap();
let sd = res.sd.unwrap();
let pairs = sd.pairs;
let sd = &res.sd.unwrap()[0];
let pairs = &sd.pairs;
assert!(pairs
.iter()
.cloned()
Expand Down Expand Up @@ -388,8 +388,11 @@ fn test_ltsv_3() {

assert!(res.hostname == "testhostname");
assert!(res.msg.unwrap() == "this is a test");
let sd = res.sd.unwrap();
let pairs = sd.pairs;

let sd = &res.sd.unwrap();
assert!(sd.len() == 1);
let pairs = &sd[0].pairs;

assert!(pairs
.iter()
.cloned()
Expand Down
9 changes: 9 additions & 0 deletions src/flowgger/decoder/rfc3164_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ fn test_rfc3164_decode_nopri() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -237,6 +238,7 @@ fn test_rfc3164_decode_with_pri() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -256,6 +258,7 @@ fn test_rfc3164_decode_with_pri_year() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -275,6 +278,7 @@ fn test_rfc3164_decode_with_pri_year_tz() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -294,6 +298,7 @@ fn test_rfc3164_decode_tz_no_year() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand Down Expand Up @@ -335,6 +340,7 @@ fn test_rfc3164_decode_custom_with_year() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 some test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -354,6 +360,7 @@ fn test_rfc3164_decode_custom_with_year_notz() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname: a test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -373,6 +380,7 @@ fn test_rfc3164_decode_custom_with_pri() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname: test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -391,4 +399,5 @@ fn test_rfc3164_decode_custom_trimed() {
assert_eq!(res.procid, None);
assert_eq!(res.msgid, None);
assert_eq!(res.full_msg, Some("<13>testhostname: 2019 Mar 27 12:09:39 UTC: appname: test message".to_string()));
assert!(res.sd.is_none());
}
92 changes: 78 additions & 14 deletions src/flowgger/decoder/rfc5424_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ impl Decoder for RFC5424Decoder {
let appname = parts.next().ok_or("Missing application name")?;
let procid = parts.next().ok_or("Missing process id")?;
let msgid = parts.next().ok_or("Missing message id")?;
let (sd, msg) = parse_data(parts.next().ok_or("Missing message data")?)?;
let (sd_vec, msg) = parse_data(parts.next().ok_or("Missing message data")?)?;

let record = Record {
ts,
hostname: hostname.to_owned(),
Expand All @@ -35,7 +36,7 @@ impl Decoder for RFC5424Decoder {
appname: Some(appname.to_owned()),
procid: Some(procid.to_owned()),
msgid: Some(msgid.to_owned()),
sd,
sd: if sd_vec.is_empty() { None } else { Some(sd_vec) },
msg,
full_msg: Some(line.trim_end().to_owned()),
};
Expand Down Expand Up @@ -118,6 +119,38 @@ fn unescape_sd_value(value: &str) -> String {
res
}

fn parse_data(line: &str) -> Result<(Vec<StructuredData>, Option<String>), &'static str> {
let mut sd_vec:Vec<StructuredData> = Vec::new();
match line.chars().next().ok_or("Missing log message")? {
'-' => {
// No SD, just a message
return Ok((sd_vec, parse_msg(line, 1)));
}
'[' => {
// At least one SD
let (mut leftover, mut offset) = (line, 0);
let mut next_sd = true;
while next_sd {
let (sd, new_leftover, new_offset) = parse_sd_data(leftover, offset + 1)?;
// Unfortunately we have to reassign, https://github.com/rust-lang/rfcs/pull/2909 not yet implemented
leftover = new_leftover;
offset = new_offset;
sd_vec.push(sd);

match leftover[offset..].chars().next().ok_or("Missing log message")? {
// Another SD
'[' => next_sd = true,
// Separator, the rest is the message
' ' => return Ok((sd_vec, parse_msg(leftover, offset))),
_ => return Err("Malformated RFC5424 message"),
}
}
return Ok((sd_vec, parse_msg(leftover, 1)));
}
_ => return Err("Malformated RFC5424 message"),
};
}

fn parse_msg(line: &str, offset: usize) -> Option<String> {
if offset > line.len() {
None
Expand All @@ -129,15 +162,8 @@ fn parse_msg(line: &str, offset: usize) -> Option<String> {
}
}

fn parse_data(line: &str) -> Result<(Option<StructuredData>, Option<String>), &'static str> {
match line.chars().next().ok_or("Short message")? {
'-' => {
return Ok((None, parse_msg(line, 1)));
}
'[' => {}
_ => return Err("Short message"),
};
let mut parts = line[1..].splitn(2, ' ');
fn parse_sd_data(line: &str, offset: usize) -> Result<(StructuredData, &str, usize), &'static str> {
let mut parts = line[offset..].splitn(2, ' ');
let sd_id = parts.next().ok_or("Missing structured data id")?;
let sd = parts.next().ok_or("Missing structured data")?;
let mut in_name = false;
Expand Down Expand Up @@ -202,7 +228,7 @@ fn parse_data(line: &str) -> Result<(Option<StructuredData>, Option<String>), &'
}
match after_sd {
None => Err("Missing ] after structured data"),
Some(offset) => Ok((Some(sd_res), parse_msg(sd, offset))),
Some(offset) => Ok((sd_res, sd, offset)),
}
}

Expand All @@ -218,9 +244,47 @@ fn test_rfc5424() {
assert!(res.procid == Some("69".to_owned()));
assert!(res.msgid == Some("42".to_owned()));
assert!(res.msg == Some("test message".to_owned()));
let sd = res.sd.unwrap();
let sd_vec = res.sd.unwrap();
assert!(sd_vec.len() == 1);
let sd = &sd_vec[0];
assert!(sd.sd_id == Some("origin@123".to_owned()));
let pairs = &sd.pairs;

assert!(pairs
.iter()
.cloned()
.any(|(k, v)| if let SDValue::String(v) = v {
k == "_software" && v == "te\\st sc\"ript"
} else {
false
}));
assert!(pairs
.iter()
.cloned()
.any(|(k, v)| if let SDValue::String(v) = v {
k == "_swVersion" && v == "0.0.1"
} else {
false
}));
}

#[test]
fn test_rfc5424_multiple_sd() {
let msg = r#"<23>1 2015-08-05T15:53:45.637824Z testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"][master@456 key="value" key2="value2"] test message"#;
let res = RFC5424Decoder.decode(msg).unwrap();
assert!(res.facility.unwrap() == 2);
assert!(res.severity.unwrap() == 7);
assert!(res.ts == 1438790025.637824);
assert!(res.hostname == "testhostname");
assert!(res.appname == Some("appname".to_owned()));
assert!(res.procid == Some("69".to_owned()));
assert!(res.msgid == Some("42".to_owned()));
assert!(res.msg == Some("test message".to_owned()));
let sd_vec = res.sd.unwrap();
assert!(sd_vec.len() == 2);
let sd = &sd_vec[0];
assert!(sd.sd_id == Some("origin@123".to_owned()));
let pairs = sd.pairs;
let pairs = &sd.pairs;

assert!(pairs
.iter()
Expand Down
51 changes: 44 additions & 7 deletions src/flowgger/encoder/capnp_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,25 @@ fn build_record<T: Allocator>(
if let Some(full_msg) = record.full_msg {
root.set_full_msg(&full_msg);
}
if let Some(sd) = record.sd {
if let Some(sd_vec) = record.sd {
// Warning: the current capnp format only support one structured data. Redefining the
// format would be a breaking change.
let sd = &sd_vec[0];
sd.sd_id.as_ref().and_then(|sd_id| {
root.set_sd_id(sd_id);
Some(())
});
let mut pairs = root.reborrow().init_pairs(sd.pairs.len() as u32);
for (i, (name, value)) in sd.pairs.into_iter().enumerate() {
for (i, (name, value)) in (&sd.pairs).into_iter().enumerate() {
let mut pair = pairs.reborrow().get(i as u32);
pair.set_key(&name);
let mut v = pair.init_value();
match value {
SDValue::String(value) => v.set_string(&value),
SDValue::Bool(value) => v.set_bool(value),
SDValue::F64(value) => v.set_f64(value),
SDValue::I64(value) => v.set_i64(value),
SDValue::U64(value) => v.set_u64(value),
SDValue::Bool(value) => v.set_bool(*value),
SDValue::F64(value) => v.set_f64(*value),
SDValue::I64(value) => v.set_i64(*value),
SDValue::U64(value) => v.set_u64(*value),
SDValue::Null => v.set_null(()),
};
}
Expand Down Expand Up @@ -129,7 +132,7 @@ mod tests {
msgid: None,
msg: Some("A short message that helps you identify what is going on".to_string()),
full_msg: Some("Backtrace here\n\nmore stuff".to_string()),
sd: Some(sd),
sd: Some(vec![sd]),
};

assert_eq!(
Expand Down Expand Up @@ -169,4 +172,38 @@ mod tests {
"\u{0}\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{2}\u{0}\t\u{0}*������A�\u{1}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}!\u{0}\u{0}\u{0}b\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}B\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}\u{1a}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}!\u{0}\u{0}\u{0}\u{1}\u{0}\u{0}=\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}A\u{0}\u{0}\u{0}\'\u{0}\u{0}\u{0}example.org\u{0}\u{0}\u{0}\u{0}\u{0}appname\u{0}44\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}A short message that helps you identify what is going on\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}Backtrace here\n\nmore stuff\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{4}\u{0}\u{0}\u{0}\u{2}\u{0}\u{2}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{5}\u{0}\u{0}\u{0}R\u{0}\u{0}\u{0}\t\u{0}\u{0}\u{0}r\u{0}\u{0}\u{0}x-header1\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}header1 value\u{0}\u{0}\u{0}"
);
}

#[test]
fn test_capnp_encode_multiple_sd() {
let config = Config::from_string("").unwrap();
let encoder = CapnpEncoder::new(&config);

let sd_vec = vec![
StructuredData {
sd_id: Some("someid".to_string()),
pairs: vec![("_some_info".to_string(), SDValue::String("foo".to_string()))],
},
StructuredData {
sd_id: Some("someid2".to_string()),
pairs: vec![("info".to_string(), SDValue::F64(123.456))],
},
];
let record = Record {
ts: 1385053862.3072,
hostname: "example.org".to_string(),
facility: None,
severity: Some(1),
appname: Some("appname".to_string()),
procid: Some("44".to_string()),
msgid: None,
msg: Some("A short message that helps you identify what is going on".to_string()),
full_msg: Some("Backtrace here\n\nmore stuff".to_string()),
sd: Some(sd_vec),
};

assert_eq!(
String::from_utf8_lossy(&encoder.encode(record).unwrap()),
"\u{0}\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{2}\u{0}\t\u{0}*������A�\u{1}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}!\u{0}\u{0}\u{0}b\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}B\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}\u{1a}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}!\u{0}\u{0}\u{0}\u{1}\u{0}\u{0}=\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}I\u{0}\u{0}\u{0}:\u{0}\u{0}\u{0}I\u{0}\u{0}\u{0}\'\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}example.org\u{0}\u{0}\u{0}\u{0}\u{0}appname\u{0}44\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}A short message that helps you identify what is going on\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}Backtrace here\n\nmore stuff\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}someid\u{0}\u{0}\u{4}\u{0}\u{0}\u{0}\u{2}\u{0}\u{2}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{5}\u{0}\u{0}\u{0}Z\u{0}\u{0}\u{0}\t\u{0}\u{0}\u{0}\"\u{0}\u{0}\u{0}_some_info\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}foo\u{0}\u{0}\u{0}\u{0}\u{0}"
);
}
}
Loading

0 comments on commit a1d4738

Please sign in to comment.