From fc1cee7ab1010ac3d88f327c999b16842aa763b8 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 15 May 2019 13:51:47 -0400 Subject: [PATCH 1/5] Update splunk to use structured data --- src/sinks/splunk.rs | 78 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 16 deletions(-) diff --git a/src/sinks/splunk.rs b/src/sinks/splunk.rs index 76dcdba8be8c1..7e90a45b4eb44 100644 --- a/src/sinks/splunk.rs +++ b/src/sinks/splunk.rs @@ -1,6 +1,6 @@ use crate::{ buffers::Acker, - event::{self, Event}, + event::{self, Event, ValueKind}, sinks::util::{ http::{HttpRetryLogic, HttpService}, retries::FixedRetryPolicy, @@ -106,21 +106,7 @@ pub fn hec(config: HecSinkConfig, acker: Acker) -> Result Result<(), String> { } } +fn encode_event(host_field: &Atom, mut event: Event) -> Result, ()> { + let host = event.as_log().get(&host_field).map(|h| h.clone()); + let timestamp = + if let Some(ValueKind::Timestamp(ts)) = event.as_mut_log().remove(&event::TIMESTAMP) { + ts.timestamp() + } else { + chrono::Utc::now().timestamp() + }; + + let mut body = if event.as_log().is_structured() { + json!({ + "event": event.as_log().all_fields(), + "fields": event.as_log().explicit_fields(), + "time": timestamp, + }) + } else { + json!({ + "event": event.as_log()[&event::MESSAGE].to_string_lossy(), + "fields": event.as_log().explicit_fields(), + "time": timestamp, + }) + }; + + if let Some(host) = host { + let host = host.to_string_lossy(); + body["host"] = json!(host); + } + + serde_json::to_vec(&body).map_err(|e| panic!("Error encoding json body: {}", e)) +} + #[cfg(test)] mod tests { use super::*; + use crate::event::{self, Event}; + use serde::Deserialize; + use std::collections::HashMap; + + #[derive(Deserialize, Debug)] + struct HecEvent { + time: i64, + event: HashMap, + fields: HashMap, + } + + #[test] + fn splunk_encode_event_structured() { + let host = "host".into(); + let mut event = Event::from("hello world"); + event + .as_mut_log() + .insert_explicit("key".into(), "value".into()); + + let bytes = encode_event(&host, event).unwrap(); + + let hec_event = serde_json::from_slice::(&bytes[..]).unwrap(); + + let event = &hec_event.event; + let kv = event.get("key".into()).unwrap(); + + assert_eq!(kv, &"value".to_string()); + assert!(event.get(&event::TIMESTAMP.to_string()).is_none()); + } #[test] fn splunk_validate_host() { From 1c28936eeec7c12f88b4d510c910bed18ba3bb0d Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 15 May 2019 15:42:38 -0400 Subject: [PATCH 2/5] Fix splunk integration tests --- src/sinks/splunk.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/sinks/splunk.rs b/src/sinks/splunk.rs index 7e90a45b4eb44..4b57020e8652f 100644 --- a/src/sinks/splunk.rs +++ b/src/sinks/splunk.rs @@ -321,7 +321,7 @@ mod integration_tests { .find_map(|_| { recent_entries() .into_iter() - .find(|entry| entry["_raw"].as_str().unwrap() == message) + .find(|entry| entry["_raw"].as_str().unwrap().contains(message.as_str())) .or_else(|| { std::thread::sleep(std::time::Duration::from_millis(100)); None @@ -329,8 +329,9 @@ mod integration_tests { }) .expect("Didn't find event in Splunk"); - assert_eq!(message, entry["_raw"].as_str().unwrap()); - assert_eq!("hello", entry["asdf"].as_str().unwrap()); + assert_eq!(message, entry["message"].as_str().unwrap()); + let asdf = entry["asdf"].as_array().unwrap()[0].as_str().unwrap(); + assert_eq!("hello", asdf); } #[test] @@ -356,7 +357,7 @@ mod integration_tests { .find_map(|_| { recent_entries() .into_iter() - .find(|entry| entry["_raw"].as_str().unwrap() == message) + .find(|entry| entry["_raw"].as_str().unwrap().contains(message.as_str())) .or_else(|| { std::thread::sleep(std::time::Duration::from_millis(100)); None @@ -364,9 +365,11 @@ mod integration_tests { }) .expect("Didn't find event in Splunk"); - assert_eq!(message, entry["_raw"].as_str().unwrap()); - assert_eq!("hello", entry["asdf"].as_str().unwrap()); - assert_eq!("example.com:1234", entry["host"].as_str().unwrap()); + assert_eq!(message, entry["message"].as_str().unwrap()); + let asdf = entry["asdf"].as_array().unwrap()[0].as_str().unwrap(); + assert_eq!("hello", asdf); + let host = entry["host"].as_array().unwrap()[0].as_str().unwrap(); + assert_eq!("example.com:1234", host); } #[test] @@ -400,7 +403,7 @@ mod integration_tests { .find_map(|_| { recent_entries() .into_iter() - .find(|entry| entry["_raw"].as_str().unwrap() == message) + .find(|entry| entry["_raw"].as_str().unwrap().contains(message.as_str())) .or_else(|| { std::thread::sleep(std::time::Duration::from_millis(100)); None @@ -408,9 +411,11 @@ mod integration_tests { }) .expect("Didn't find event in Splunk"); - assert_eq!(message, entry["_raw"].as_str().unwrap()); - assert_eq!("hello", entry["asdf"].as_str().unwrap()); - assert_eq!("beef.example.com:1234", entry["host"].as_str().unwrap()); + assert_eq!(message, entry["message"].as_str().unwrap()); + let asdf = entry["asdf"].as_array().unwrap()[0].as_str().unwrap(); + assert_eq!("hello", asdf); + let host = entry["host"].as_array().unwrap()[0].as_str().unwrap(); + assert_eq!("beef.example.com:1234", host); } #[test] From db4c00c8b44efe12b98df6933dcda7f59f2b028a Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Thu, 16 May 2019 10:20:38 -0400 Subject: [PATCH 3/5] Address comments --- src/sinks/splunk.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/sinks/splunk.rs b/src/sinks/splunk.rs index 4b57020e8652f..8f6fb07c8be7d 100644 --- a/src/sinks/splunk.rs +++ b/src/sinks/splunk.rs @@ -147,25 +147,25 @@ pub fn validate_host(host: &String) -> Result<(), String> { } } -fn encode_event(host_field: &Atom, mut event: Event) -> Result, ()> { - let host = event.as_log().get(&host_field).map(|h| h.clone()); - let timestamp = - if let Some(ValueKind::Timestamp(ts)) = event.as_mut_log().remove(&event::TIMESTAMP) { - ts.timestamp() - } else { - chrono::Utc::now().timestamp() - }; +fn encode_event(host_field: &Atom, event: Event) -> Result, ()> { + let mut event = event.into_log(); + + let host = event.get(&host_field).map(|h| h.clone()); + let timestamp = if let Some(ValueKind::Timestamp(ts)) = event.remove(&event::TIMESTAMP) { + ts.timestamp() + } else { + chrono::Utc::now().timestamp() + }; - let mut body = if event.as_log().is_structured() { + let mut body = if event.is_structured() { json!({ - "event": event.as_log().all_fields(), - "fields": event.as_log().explicit_fields(), + "event": event.all_fields(), + "fields": event.explicit_fields(), "time": timestamp, }) } else { json!({ - "event": event.as_log()[&event::MESSAGE].to_string_lossy(), - "fields": event.as_log().explicit_fields(), + "event": event[&event::MESSAGE].to_string_lossy(), "time": timestamp, }) }; From 9cd92bdce5f8c06373cd2d4c89e9ed9399ee086b Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 17 May 2019 12:00:45 -0400 Subject: [PATCH 4/5] Assert implicit field --- src/sinks/splunk_hec.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/sinks/splunk_hec.rs b/src/sinks/splunk_hec.rs index b65f0973f3a3f..5c7b23030f58d 100644 --- a/src/sinks/splunk_hec.rs +++ b/src/sinks/splunk_hec.rs @@ -208,6 +208,10 @@ mod tests { let kv = event.get("key".into()).unwrap(); assert_eq!(kv, &"value".to_string()); + assert_eq!( + event[&event::MESSAGE.to_string()], + "hello world".to_string() + ); assert!(event.get(&event::TIMESTAMP.to_string()).is_none()); } From 992cca22a88b4d7751654fd86280e33d591a2821 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Mon, 20 May 2019 11:45:20 -0400 Subject: [PATCH 5/5] Add optional encoding type to splunk --- src/sinks/splunk_hec.rs | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/sinks/splunk_hec.rs b/src/sinks/splunk_hec.rs index 5c7b23030f58d..603668f4123ca 100644 --- a/src/sinks/splunk_hec.rs +++ b/src/sinks/splunk_hec.rs @@ -27,6 +27,7 @@ pub struct HecSinkConfig { pub compression: Option, #[serde(default = "default_host_field")] pub host_field: Atom, + pub encoding: Option, // Tower Request based configuration pub request_in_flight_limit: Option, @@ -37,6 +38,13 @@ pub struct HecSinkConfig { pub request_retry_backoff_secs: Option, } +#[derive(Deserialize, Serialize, Debug, Clone)] +#[serde(rename_all = "snake_case")] +pub enum Encoding { + Text, + Json, +} + fn default_host_field() -> Atom { event::HOST.clone() } @@ -69,6 +77,7 @@ pub fn hec(config: HecSinkConfig, acker: Acker) -> Result Result Result<(), String> { } } -fn encode_event(host_field: &Atom, event: Event) -> Result, ()> { +fn encode_event( + host_field: &Atom, + event: Event, + encoding: &Option, +) -> Result, ()> { let mut event = event.into_log(); let host = event.get(&host_field).map(|h| h.clone()); @@ -157,17 +170,16 @@ fn encode_event(host_field: &Atom, event: Event) -> Result, ()> { chrono::Utc::now().timestamp() }; - let mut body = if event.is_structured() { - json!({ + let mut body = match (encoding, event.is_structured()) { + (&Some(Encoding::Json), _) | (_, true) => json!({ "event": event.all_fields(), "fields": event.explicit_fields(), "time": timestamp, - }) - } else { - json!({ + }), + (&Some(Encoding::Text), _) | (_, false) => json!({ "event": event[&event::MESSAGE].to_string_lossy(), "time": timestamp, - }) + }), }; if let Some(host) = host { @@ -200,7 +212,7 @@ mod tests { .as_mut_log() .insert_explicit("key".into(), "value".into()); - let bytes = encode_event(&host, event).unwrap(); + let bytes = encode_event(&host, event, &None).unwrap(); let hec_event = serde_json::from_slice::(&bytes[..]).unwrap();