Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more format handling to rfc3164, and passthrough output #57

Merged
merged 3 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
edition = "2018"
name = "flowgger"
version = "0.2.9"
version = "0.2.10"
authors = ["Frank Denis <[email protected]>", "Matteo Bigoi <[email protected]>", "Vivien Chene <[email protected]>", "Francesco Berni <[email protected]>"]
build = "build.rs"
repository = "https://github.com/awslabs/flowgger"
Expand All @@ -26,9 +26,10 @@ kafka-output = ["kafka"]
tls = ["openssl"]
gelf = ["serde", "serde_json"]
ltsv = []
syslog = ["rfc5424", "rfc3164"]
syslog = ["rfc5424", "rfc3164", "passthrough"]
rfc3164=[]
rfc5424=[]
passthrough=[]
file = ["notify", "glob"]

[build-dependencies.capnpc]
Expand All @@ -38,6 +39,7 @@ optional = true
[dependencies]
capnp = { version = "0.10", optional = true }
chrono = "0.4"
chrono-tz = "0.5"
clap = "2"
flate2 = "1"
glob = { version = "0.3", optional = true }
Expand Down
3 changes: 2 additions & 1 deletion flowgger.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ file_rotation_timeformat = "%Y%m%dT%H%M%SZ"

### Syslog
framing = "line"
# "rfc3164" or "rfc5424" or "passthrough"
format = "rfc3164"
# Format of the optional timestamp to be prepended to each event
rfc3164_prepend_timestamp="[%Y-%m-%dT%H:%M:%S%.6fZ]"
syslog_prepend_timestamp="[%Y-%m-%dT%H:%M:%S%.6fZ]"
2 changes: 1 addition & 1 deletion src/flowgger/decoder/ltsv_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl Decoder for LTSVDecoder {
msgid: None,
sd: if sd.pairs.is_empty() { None } else { Some(sd) },
msg,
full_msg: None,
full_msg: Some(line.to_owned()),
};
Ok(record)
}
Expand Down
289 changes: 250 additions & 39 deletions src/flowgger/decoder/rfc3164_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use super::Decoder;
use crate::flowgger::config::Config;
use crate::flowgger::record::Record;
use crate::flowgger::utils;
use chrono::{Datelike, NaiveDateTime, Utc};
use chrono::{Datelike, NaiveDateTime, TimeZone, Utc};
use chrono_tz::Tz;
use std::io::{stderr, Write};

#[derive(Clone)]
pub struct RFC3164Decoder {}
Expand All @@ -14,7 +16,10 @@ impl RFC3164Decoder {
}

impl Decoder for RFC3164Decoder {
/// Implementation of the RF3164 decoder. Decode a string into a record object
/// Implementation of the RF3164 decoder. Decode a string into a record object.alloc
/// RFC3164 is quite lenient and allow many different implementation.
/// This decoder starts decoding the most common format, as exampled provided in RFC.
/// If this fails, device specific implementations are looked for.
///
/// # Arguments
/// * `line` - String to decode
Expand All @@ -26,35 +31,19 @@ impl Decoder for RFC3164Decoder {
// Get the optional pri part and remove it from the string
let (pri, _msg) = parse_strip_pri(line)?;

// The event may have several consecutive spaces as separator
let tokens_vec = _msg.split_whitespace().collect::<Vec<&str>>();

// If we have less than 4 tokens, the input can't be valid
if tokens_vec.len() > 3 {
// Date is made of the first 3 space separated tokens, rebuild it
let _date_str = tokens_vec[0..3].join(" ");
let _hostname = tokens_vec[3];

// All that remains is the message that may contain several spaces, so rebuild it
let _message = tokens_vec[4..].join(" ");

let ts = parse_ts(&_date_str)?;
let record = Record {
ts,
hostname: _hostname.to_owned(),
facility: pri.facility,
severity: pri.severity,
appname: None,
procid: None,
msgid: None,
msg: Some(_message.to_owned()),
full_msg: Some(line.to_owned()),
sd: None,
};
Ok(record)
} else {
Err("Malformed RFC3164 event: Invalid timestamp or hostname")
let mut res = decode_rfc_standard(&pri, _msg, line);
if let Ok (record) = res {
return Ok(record);
}

// Specific implementation
res = decode_rfc_custom(&pri, _msg, line);
if let Ok (record) = res {
return Ok(record);
}

let _ = writeln!(stderr(), "Unable to parse the rfc3164 input: '{}'", line);
res
}
}

Expand All @@ -63,6 +52,74 @@ struct Pri {
severity: Option<u8>,
}

fn decode_rfc_standard(pri: &Pri, msg: &str, line: &str) -> Result<Record, &'static str> {
// Decoding "recommended" rfc input as advised in the rfc: [<pri>]<datetime> <hostname> <message>

// The event may have several consecutive spaces as separator
let tokens_vec = msg.split_whitespace().collect::<Vec<&str>>();

// If we have less than 4 tokens, the input can't be valid
if tokens_vec.len() > 3 {
// Parse the date, the next token is the hostname
let (ts, _log_tokens) = parse_date_token(&tokens_vec)?;
let _hostname = _log_tokens[0];

// All that remains is the message that may contain several spaces, so rebuild it
let _message = _log_tokens[1..].join(" ");

let record = Record {
ts,
hostname: _hostname.to_owned(),
facility: pri.facility,
severity: pri.severity,
appname: None,
procid: None,
msgid: None,
msg: Some(_message.to_owned()),
full_msg: Some(line.to_owned()),
sd: None,
};
Ok(record)
} else {
Err("Malformed RFC3164 standard event: Invalid timestamp or hostname")
}
}

fn decode_rfc_custom(pri: &Pri, msg: &str, line: &str) -> Result<Record, &'static str> {
// Decoding custom rfc input formatted as : [<pri>]<hostname>: <datetime>: <message>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happned when we find a slightly different one again? Or is this the only real offender in the wild?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of device custom log formats, in this case for cisco devices, dates and hostnames makes flowgger compliant with nxos and ios. Most devices seem to comply with the recommanded usage. However there is for sure other formats. I cannot add all existing formats here, but the design update makes it easier to add them on case by case.


// The event separator for hostname/timestamp/message is ": "
let tokens_vec = msg.split(": ").collect::<Vec<&str>>();

// If we have less than 2 tokens, the input can't be valid
if tokens_vec.len() > 2 {
let _hostname = tokens_vec[0];

// The date is space separated, but make sure to remove consecutive spaces
let date_tokens_vec = tokens_vec[1].split_whitespace().collect::<Vec<&str>>();
let (ts, _) = parse_date_token(&date_tokens_vec)?;

// All that remains is the message, rebuild it
let _message = tokens_vec[2..].join(": ");

let record = Record {
ts,
hostname: _hostname.to_owned(),
facility: pri.facility,
severity: pri.severity,
appname: None,
procid: None,
msgid: None,
msg: Some(_message.to_owned()),
full_msg: Some(line.to_owned()),
sd: None,
};
Ok(record)
} else {
Err("Malformed RFC3164 event: Invalid timestamp or hostname")
}
}

fn parse_strip_pri(event: &str) -> Result<(Pri, &str), &'static str> {
if event.starts_with('<') {
let pri_end_index = event
Expand Down Expand Up @@ -92,22 +149,60 @@ fn parse_strip_pri(event: &str) -> Result<(Pri, &str), &'static str> {
}
}

fn parse_ts(ts_str: &str) -> Result<f64, &'static str> {
// Append the year to parse a full ts
let current_year = Utc::now().year();
let ts = format!("{} {}", current_year, ts_str);
fn parse_date_token<'a>(ts_tokens: &'a [&str]) -> Result<(f64, Vec<&'a str>), &'static str> {
// If we don't have at least 3 tokens, don't even try, parsing will fail
if ts_tokens.len() < 3 {
return Err("Invalid time format");
}
// Decode the date/time without year (expected), and if it fails, try add the year
parse_date(ts_tokens, false).or_else(|_| parse_date(ts_tokens, true))
}

match NaiveDateTime::parse_from_str(&ts, "%Y %b %d %H:%M:%S") {
Ok(date) => Ok(utils::PreciseTimestamp::from_naive_datetime(date).as_f64()),
Err(_) => Err("Unable to parse the date"),
fn parse_date<'a>(ts_tokens: &'a [&str], has_year: bool) -> Result<(f64, Vec<&'a str>), &'static str> {
// Decode the date/time from the given tokens with optional year specified
let ts_str;
let mut idx;

// If no year in the string, parse manually add the current year
if has_year {
idx = 4;
ts_str = ts_tokens[0..idx].join(" ");
}
else {
idx = 3;
let current_year = Utc::now().year();
ts_str = format!("{} {}", current_year, ts_tokens[0..idx].join(" "));
}

match NaiveDateTime::parse_from_str(&ts_str, "%Y %b %d %H:%M:%S") {
Ok(naive_dt) => {
// See if the next token is a timezone
let mut ts = 0.0;
let tz_res: Result<Tz, String> = if ts_tokens.len() > idx { ts_tokens[idx].parse() } else { Err("No timezone".to_string()) };
if let Ok(tz) = tz_res {
let dt = tz.from_local_datetime(&naive_dt).single();
if dt.is_some() {
ts = utils::PreciseTimestamp::from_datetime_tz(dt.unwrap()).as_f64();
idx += 1;
}
}
// No timezome, give a timestamp without tz
else {
ts = utils::PreciseTimestamp::from_naive_datetime(naive_dt).as_f64();
}
Ok((ts, ts_tokens[idx..].to_vec()))
},
Err(_err) => {
Err("Unable to parse date")
crisidev marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

#[cfg(test)]
use crate::flowgger::utils::test_utils::rfc_test_utils::ts_from_partial_date_time;
use crate::flowgger::utils::test_utils::rfc_test_utils::{ts_from_partial_date_time, ts_from_date_time};

#[test]
fn test_rfc3164_decode() {
fn test_rfc3164_decode_nopri() {
let msg = r#"Aug 6 11:15:24 testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_partial_date_time(8, 6, 11, 15, 24);
Expand Down Expand Up @@ -144,6 +239,63 @@ fn test_rfc3164_decode_with_pri() {
assert_eq!(res.full_msg, Some(msg.to_string()));
}

#[test]
fn test_rfc3164_decode_with_pri_year() {
let msg = r#"<13>2020 Aug 6 11:15:24 testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_date_time(2020, 8, 6, 11, 15, 24, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
assert_eq!(res.facility, Some(1));
assert_eq!(res.severity, Some(5));
assert_eq!(res.ts, expected_ts);
assert_eq!(res.hostname, "testhostname");
assert_eq!(res.appname, None);
assert_eq!(res.procid, None);
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
}

#[test]
fn test_rfc3164_decode_with_pri_year_tz() {
let msg = r#"<13>2020 Aug 6 11:15:24 UTC testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_date_time(2020, 8, 6, 11, 15, 24, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
assert_eq!(res.facility, Some(1));
assert_eq!(res.severity, Some(5));
assert_eq!(res.ts, expected_ts);
assert_eq!(res.hostname, "testhostname");
assert_eq!(res.appname, None);
assert_eq!(res.procid, None);
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
}

#[test]
fn test_rfc3164_decode_tz_no_year() {
let msg = r#"Aug 6 11:15:24 UTC testhostname appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_partial_date_time(8, 6, 11, 15, 24);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
assert_eq!(res.facility, None);
assert_eq!(res.severity, None);
assert_eq!(res.ts, expected_ts);
assert_eq!(res.hostname, "testhostname");
assert_eq!(res.appname, None);
assert_eq!(res.procid, None);
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 [origin@123 software="te\st sc\"ript" swVersion="0.0.1"] test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
}

#[test]
fn test_rfc3164_decode_invalid_event() {
let msg = "test message";
Expand All @@ -163,3 +315,62 @@ fn test_rfc3164_decode_invalid_date() {
let res = decoder.decode(msg);
assert!(res.is_err());
}

#[test]
fn test_rfc3164_decode_custom_with_year() {
// let msg = r#"testhostname: 2019 Mar 27 12:09:39 UTC: appname: test message"#;
let msg = r#"testhostname: 2020 Aug 6 11:15:24 UTC: appname 69 42 some test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
// let expected_ts = ts_from_date_time(2019, 3, 27, 12, 9, 39, 0);
let expected_ts = ts_from_date_time(2020, 8, 6, 11, 15, 24, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
assert_eq!(res.facility, None);
assert_eq!(res.severity, None);
assert_eq!(res.ts, expected_ts);
assert_eq!(res.hostname, "testhostname");
assert_eq!(res.appname, None);
assert_eq!(res.procid, None);
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname 69 42 some test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
}

#[test]
fn test_rfc3164_decode_custom_with_year_notz() {
let msg = r#"testhostname: 2019 Mar 27 12:09:39: appname: a test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_date_time(2019, 3, 27, 12, 9, 39, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
assert_eq!(res.facility, None);
assert_eq!(res.severity, None);
assert_eq!(res.ts, expected_ts);
assert_eq!(res.hostname, "testhostname");
assert_eq!(res.appname, None);
assert_eq!(res.procid, None);
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname: a test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
}

#[test]
fn test_rfc3164_decode_custom_with_pri() {
let msg = r#"<13>testhostname: 2019 Mar 27 12:09:39 UTC: appname: test message"#;
let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"rfc3164\"\n").unwrap();
let expected_ts = ts_from_date_time(2019, 3, 27, 12, 9, 39, 0);

let decoder = RFC3164Decoder::new(&cfg);
let res = decoder.decode(msg).unwrap();
assert_eq!(res.facility, Some(1));
assert_eq!(res.severity, Some(5));
assert_eq!(res.ts, expected_ts);
assert_eq!(res.hostname, "testhostname");
assert_eq!(res.appname, None);
assert_eq!(res.procid, None);
assert_eq!(res.msgid, None);
assert_eq!(res.msg, Some(r#"appname: test message"#.to_string()));
assert_eq!(res.full_msg, Some(msg.to_string()));
}
Loading