Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple SD in RFC5424 (fix issue #61) #62

Merged
merged 1 commit into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
edition = "2018"
name = "flowgger"
version = "0.2.11"
version = "0.2.12"
authors = ["Frank Denis <[email protected]>", "Matteo Bigoi <[email protected]>", "Vivien Chene <[email protected]>", "Francesco Berni <[email protected]>"]
build = "build.rs"
repository = "https://github.com/awslabs/flowgger"
Expand Down
7 changes: 4 additions & 3 deletions src/flowgger/decoder/gelf_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Decoder for GelfDecoder {
appname: None,
procid: None,
msgid: None,
sd: if sd.pairs.is_empty() { None } else { Some(sd) },
sd: if sd.pairs.is_empty() { None } else { Some(vec![sd]) },
msg,
full_msg,
};
Expand All @@ -136,8 +136,9 @@ mod test {
assert!(res.full_msg.unwrap() == "Backtrace here\n\nmore stuff");
assert!(res.severity.unwrap() == 1);

let sd = res.sd.unwrap();
let pairs = sd.pairs;
let sd = &res.sd.unwrap();
assert!(sd.len() == 1);
let pairs = &sd[0].pairs;
assert!(pairs
.iter()
.cloned()
Expand Down
25 changes: 14 additions & 11 deletions src/flowgger/decoder/ltsv_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ impl LTSVDecoder {
"f64" => SDValueType::F64,
"i64" => SDValueType::I64,
"u64" => SDValueType::U64,
_ => panic!(format!(
_ => panic!(
"Unsupported type in input.ltsv_schema for name [{}]",
name
)),
),
};
schema.insert(name.to_owned(), sdtype);
}
Expand Down Expand Up @@ -73,10 +73,10 @@ impl LTSVDecoder {
"f64" => suffixes.s_f64 = Some(suffix),
"i64" => suffixes.s_i64 = Some(suffix),
"u64" => suffixes.s_u64 = Some(suffix),
_ => panic!(format!(
_ => panic!(
"Unsupported type in input.ltsv_suffixes for type [{}]",
sdtype
)),
),
}
}
}
Expand Down Expand Up @@ -211,7 +211,7 @@ impl Decoder for LTSVDecoder {
appname: None,
procid: None,
msgid: None,
sd: if sd.pairs.is_empty() { None } else { Some(sd) },
sd: if sd.pairs.is_empty() { None } else { Some(vec![sd]) },
msg,
full_msg: Some(line.to_owned()),
};
Expand Down Expand Up @@ -259,8 +259,8 @@ fn test_ltsv_suffixes() {
-0700]\tdone:true\tscore:-1\tmean:0.42\tcounter:42\tlevel:3\thost:\
testhostname\tname1:value1\tname 2: value 2\tn3:v3\tmessage:this is a test";
let res = ltsv_decoder.decode(msg).unwrap();
let sd = res.sd.unwrap();
let pairs = sd.pairs;
let sd = &res.sd.unwrap()[0];
let pairs = &sd.pairs;
assert!(pairs
.iter()
.cloned()
Expand Down Expand Up @@ -309,8 +309,8 @@ fn test_ltsv_suffixes_2() {
-0700]\tdone_bool:true\tscore_i64:-1\tmean_f64:0.42\tcounter_u64:42\tlevel:3\thost:\
testhostname\tname1:value1\tname 2: value 2\tn3:v3\tmessage:this is a test";
let res = ltsv_decoder.decode(msg).unwrap();
let sd = res.sd.unwrap();
let pairs = sd.pairs;
let sd = &res.sd.unwrap()[0];
let pairs = &sd.pairs;
assert!(pairs
.iter()
.cloned()
Expand Down Expand Up @@ -388,8 +388,11 @@ fn test_ltsv_3() {

assert!(res.hostname == "testhostname");
assert!(res.msg.unwrap() == "this is a test");
let sd = res.sd.unwrap();
let pairs = sd.pairs;

let sd = &res.sd.unwrap();
assert!(sd.len() == 1);
let pairs = &sd[0].pairs;

assert!(pairs
.iter()
.cloned()
Expand Down
9 changes: 9 additions & 0 deletions src/flowgger/decoder/rfc3164_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ fn test_rfc3164_decode_nopri() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -237,6 +238,7 @@ fn test_rfc3164_decode_with_pri() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -256,6 +258,7 @@ fn test_rfc3164_decode_with_pri_year() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -275,6 +278,7 @@ fn test_rfc3164_decode_with_pri_year_tz() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -294,6 +298,7 @@ fn test_rfc3164_decode_tz_no_year() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand Down Expand Up @@ -335,6 +340,7 @@ fn test_rfc3164_decode_custom_with_year() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 some test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -354,6 +360,7 @@ fn test_rfc3164_decode_custom_with_year_notz() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname: a test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -373,6 +380,7 @@ fn test_rfc3164_decode_custom_with_pri() {
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname: test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
assert!(res.sd.is_none());
}

#[test]
Expand All @@ -391,4 +399,5 @@ fn test_rfc3164_decode_custom_trimed() {
assert_eq!(res.procid, None);
assert_eq!(res.msgid, None);
assert_eq!(res.full_msg, Some("<13>testhostname: 2019 Mar 27 12:09:39 UTC: appname: test message".to_string()));
assert!(res.sd.is_none());
}
92 changes: 78 additions & 14 deletions src/flowgger/decoder/rfc5424_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ impl Decoder for RFC5424Decoder {
let appname = parts.next().ok_or("Missing application name")?;
let procid = parts.next().ok_or("Missing process id")?;
let msgid = parts.next().ok_or("Missing message id")?;
let (sd, msg) = parse_data(parts.next().ok_or("Missing message data")?)?;
let (sd_vec, msg) = parse_data(parts.next().ok_or("Missing message data")?)?;

let record = Record {
ts,
hostname: hostname.to_owned(),
Expand All @@ -35,7 +36,7 @@ impl Decoder for RFC5424Decoder {
appname: Some(appname.to_owned()),
procid: Some(procid.to_owned()),
msgid: Some(msgid.to_owned()),
sd,
sd: if sd_vec.is_empty() { None } else { Some(sd_vec) },
msg,
full_msg: Some(line.trim_end().to_owned()),
};
Expand Down Expand Up @@ -118,6 +119,38 @@ fn unescape_sd_value(value: &str) -> String {
res
}

fn parse_data(line: &str) -> Result<(Vec<StructuredData>, Option<String>), &'static str> {
let mut sd_vec:Vec<StructuredData> = Vec::new();
match line.chars().next().ok_or("Missing log message")? {
'-' => {
// No SD, just a message
return Ok((sd_vec, parse_msg(line, 1)));
}
'[' => {
// At least one SD
let (mut leftover, mut offset) = (line, 0);
let mut next_sd = true;
while next_sd {
let (sd, new_leftover, new_offset) = parse_sd_data(leftover, offset + 1)?;
// Unfortunately we have to reassign, https://github.com/rust-lang/rfcs/pull/2909 not yet implemented
leftover = new_leftover;
offset = new_offset;
sd_vec.push(sd);

match leftover[offset..].chars().next().ok_or("Missing log message")? {
// Another SD
'[' => next_sd = true,
// Separator, the rest is the message
' ' => return Ok((sd_vec, parse_msg(leftover, offset))),
_ => return Err("Malformated RFC5424 message"),
}
}
return Ok((sd_vec, parse_msg(leftover, 1)));
}
_ => return Err("Malformated RFC5424 message"),
};
}

fn parse_msg(line: &str, offset: usize) -> Option<String> {
if offset > line.len() {
None
Expand All @@ -129,15 +162,8 @@ fn parse_msg(line: &str, offset: usize) -> Option<String> {
}
}

fn parse_data(line: &str) -> Result<(Option<StructuredData>, Option<String>), &'static str> {
match line.chars().next().ok_or("Short message")? {
'-' => {
return Ok((None, parse_msg(line, 1)));
}
'[' => {}
_ => return Err("Short message"),
};
let mut parts = line[1..].splitn(2, ' ');
fn parse_sd_data(line: &str, offset: usize) -> Result<(StructuredData, &str, usize), &'static str> {
let mut parts = line[offset..].splitn(2, ' ');
let sd_id = parts.next().ok_or("Missing structured data id")?;
let sd = parts.next().ok_or("Missing structured data")?;
let mut in_name = false;
Expand Down Expand Up @@ -202,7 +228,7 @@ fn parse_data(line: &str) -> Result<(Option<StructuredData>, Option<String>), &'
}
match after_sd {
None => Err("Missing ] after structured data"),
Some(offset) => Ok((Some(sd_res), parse_msg(sd, offset))),
Some(offset) => Ok((sd_res, sd, offset)),
}
}

Expand All @@ -218,9 +244,47 @@ fn test_rfc5424() {
assert!(res.procid == Some("69".to_owned()));
assert!(res.msgid == Some("42".to_owned()));
assert!(res.msg == Some("test message".to_owned()));
let sd = res.sd.unwrap();
let sd_vec = res.sd.unwrap();
assert!(sd_vec.len() == 1);
let sd = &sd_vec[0];
assert!(sd.sd_id == Some("origin@123".to_owned()));
let pairs = &sd.pairs;

assert!(pairs
.iter()
.cloned()
.any(|(k, v)| if let SDValue::String(v) = v {
k == "_software" && v == "te\\st sc\"ript"
} else {
false
}));
assert!(pairs
.iter()
.cloned()
.any(|(k, v)| if let SDValue::String(v) = v {
k == "_swVersion" && v == "0.0.1"
} else {
false
}));
}

#[test]
fn test_rfc5424_multiple_sd() {
let msg = r#"<23>1 2015-08-05T15:53:45.637824Z testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"][master@456 key="value" key2="value2"] test message"#;
let res = RFC5424Decoder.decode(msg).unwrap();
assert!(res.facility.unwrap() == 2);
assert!(res.severity.unwrap() == 7);
assert!(res.ts == 1438790025.637824);
assert!(res.hostname == "testhostname");
assert!(res.appname == Some("appname".to_owned()));
assert!(res.procid == Some("69".to_owned()));
assert!(res.msgid == Some("42".to_owned()));
assert!(res.msg == Some("test message".to_owned()));
let sd_vec = res.sd.unwrap();
assert!(sd_vec.len() == 2);
let sd = &sd_vec[0];
assert!(sd.sd_id == Some("origin@123".to_owned()));
let pairs = sd.pairs;
let pairs = &sd.pairs;

assert!(pairs
.iter()
Expand Down
51 changes: 44 additions & 7 deletions src/flowgger/encoder/capnp_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,25 @@ fn build_record<T: Allocator>(
if let Some(full_msg) = record.full_msg {
root.set_full_msg(&full_msg);
}
if let Some(sd) = record.sd {
if let Some(sd_vec) = record.sd {
// Warning: the current capnp format only support one structured data. Redefining the
// format would be a breaking change.
let sd = &sd_vec[0];
sd.sd_id.as_ref().and_then(|sd_id| {
root.set_sd_id(sd_id);
Some(())
});
let mut pairs = root.reborrow().init_pairs(sd.pairs.len() as u32);
for (i, (name, value)) in sd.pairs.into_iter().enumerate() {
for (i, (name, value)) in (&sd.pairs).into_iter().enumerate() {
let mut pair = pairs.reborrow().get(i as u32);
pair.set_key(&name);
let mut v = pair.init_value();
match value {
SDValue::String(value) => v.set_string(&value),
SDValue::Bool(value) => v.set_bool(value),
SDValue::F64(value) => v.set_f64(value),
SDValue::I64(value) => v.set_i64(value),
SDValue::U64(value) => v.set_u64(value),
SDValue::Bool(value) => v.set_bool(*value),
SDValue::F64(value) => v.set_f64(*value),
SDValue::I64(value) => v.set_i64(*value),
SDValue::U64(value) => v.set_u64(*value),
SDValue::Null => v.set_null(()),
};
}
Expand Down Expand Up @@ -129,7 +132,7 @@ mod tests {
msgid: None,
msg: Some("A short message that helps you identify what is going on".to_string()),
full_msg: Some("Backtrace here\n\nmore stuff".to_string()),
sd: Some(sd),
sd: Some(vec![sd]),
};

assert_eq!(
Expand Down Expand Up @@ -169,4 +172,38 @@ mod tests {
"\u{0}\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{2}\u{0}\t\u{0}*������A�\u{1}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}!\u{0}\u{0}\u{0}b\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}B\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}\u{1a}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}!\u{0}\u{0}\u{0}\u{1}\u{0}\u{0}=\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}A\u{0}\u{0}\u{0}\'\u{0}\u{0}\u{0}example.org\u{0}\u{0}\u{0}\u{0}\u{0}appname\u{0}44\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}A short message that helps you identify what is going on\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}Backtrace here\n\nmore stuff\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{4}\u{0}\u{0}\u{0}\u{2}\u{0}\u{2}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{5}\u{0}\u{0}\u{0}R\u{0}\u{0}\u{0}\t\u{0}\u{0}\u{0}r\u{0}\u{0}\u{0}x-header1\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}header1 value\u{0}\u{0}\u{0}"
);
}

#[test]
fn test_capnp_encode_multiple_sd() {
let config = Config::from_string("").unwrap();
let encoder = CapnpEncoder::new(&config);

let sd_vec = vec![
StructuredData {
sd_id: Some("someid".to_string()),
pairs: vec![("_some_info".to_string(), SDValue::String("foo".to_string()))],
},
StructuredData {
sd_id: Some("someid2".to_string()),
pairs: vec![("info".to_string(), SDValue::F64(123.456))],
},
];
let record = Record {
ts: 1385053862.3072,
hostname: "example.org".to_string(),
facility: None,
severity: Some(1),
appname: Some("appname".to_string()),
procid: Some("44".to_string()),
msgid: None,
msg: Some("A short message that helps you identify what is going on".to_string()),
full_msg: Some("Backtrace here\n\nmore stuff".to_string()),
sd: Some(sd_vec),
};

assert_eq!(
String::from_utf8_lossy(&encoder.encode(record).unwrap()),
"\u{0}\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{2}\u{0}\t\u{0}*������A�\u{1}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}!\u{0}\u{0}\u{0}b\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}B\u{0}\u{0}\u{0}%\u{0}\u{0}\u{0}\u{1a}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}!\u{0}\u{0}\u{0}\u{1}\u{0}\u{0}=\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}I\u{0}\u{0}\u{0}:\u{0}\u{0}\u{0}I\u{0}\u{0}\u{0}\'\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}example.org\u{0}\u{0}\u{0}\u{0}\u{0}appname\u{0}44\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}A short message that helps you identify what is going on\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}Backtrace here\n\nmore stuff\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}someid\u{0}\u{0}\u{4}\u{0}\u{0}\u{0}\u{2}\u{0}\u{2}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}\u{5}\u{0}\u{0}\u{0}Z\u{0}\u{0}\u{0}\t\u{0}\u{0}\u{0}\"\u{0}\u{0}\u{0}_some_info\u{0}\u{0}\u{0}\u{0}\u{0}\u{0}foo\u{0}\u{0}\u{0}\u{0}\u{0}"
);
}
}
Loading