diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 63b56daa50551..7bb3bd350a7a1 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -592,24 +592,32 @@ mod tests { event.as_mut_log().insert("value", 100); event.as_mut_log().insert("timestamp", ts()); - let sink = create_sink( + let mut sink = create_sink( "http://localhost:9999", "my-token", ProtocolVersion::V2, "vector", - ["metric_type"].to_vec(), + [].to_vec(), ); + sink.transformer + .set_except_fields(Some(vec!["metric_type".into()])) + .unwrap(); let mut encoder = sink.build_encoder(); let bytes = encoder.encode_event(event).unwrap(); - let string = std::str::from_utf8(&bytes).unwrap(); + let line = std::str::from_utf8(&bytes).unwrap(); + assert!(line.starts_with("vector ")); - let line_protocol = split_line_protocol(string); + let line_protocol = split_line_protocol(line); assert_eq!("vector", line_protocol.0); - assert_eq!("metric_type=logs", line_protocol.1); + assert_eq!("", line_protocol.1); assert_fields( - line_protocol.2.to_string(), - ["value=100i", "message=\"hello\""].to_vec(), + line_protocol.2, + [ + "value=100i", + "message=\"hello\"" + ] + .to_vec(), ); assert_eq!("1542182950000000011\n", line_protocol.3); diff --git a/src/sinks/influxdb/mod.rs b/src/sinks/influxdb/mod.rs index c3e40cc66e7e9..6d00319dd8414 100644 --- a/src/sinks/influxdb/mod.rs +++ b/src/sinks/influxdb/mod.rs @@ -244,12 +244,16 @@ pub(in crate::sinks) fn influx_line_protocol( } encode_string(measurement, line_protocol); - line_protocol.put_u8(b','); // Tags let unwrapped_tags = tags.unwrap_or_default(); - encode_tags(unwrapped_tags, line_protocol); - line_protocol.put_u8(b' '); + if unwrapped_tags.is_empty() { + line_protocol.put_u8(b' '); + } else { + line_protocol.put_u8(b','); + encode_tags(unwrapped_tags, line_protocol); + line_protocol.put_u8(b' '); + } // Fields encode_fields(protocol_version, unwrapped_fields, line_protocol); @@ -442,12 +446,12 @@ pub mod test_util { // 1542182950000000011 // pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) { - let mut split = line_protocol.splitn(2, ',').collect::>(); - let measurement = split[0]; - - split = split[1].splitn(3, ' ').collect::>(); + // tags and ts may not be present + let parts: Vec<&str> = line_protocol.splitn(2, " ").collect(); + let (measurement, tags) = parts[0].split_once(",").unwrap_or((parts[0], "")); + let (fields, ts) = parts[1].split_once(" ").unwrap_or((parts[1], "")); - (measurement, split[0], split[1].to_string(), split[2]) + (measurement, tags, fields.to_string(), ts) } fn client() -> reqwest::Client {