diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 63b56daa50551..2106629544515 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -592,23 +592,31 @@ mod tests { event.as_mut_log().insert("value", 100); event.as_mut_log().insert("timestamp", ts()); - let sink = create_sink( + let mut sink = create_sink( "http://localhost:9999", "my-token", ProtocolVersion::V2, "vector", - ["metric_type"].to_vec(), + [].to_vec(), ); + // exclude default metric_type tag so to emit empty tags + sink.transformer + .set_except_fields(Some(vec!["metric_type".into()])) + .unwrap(); let mut encoder = sink.build_encoder(); let bytes = encoder.encode_event(event).unwrap(); - let string = std::str::from_utf8(&bytes).unwrap(); + let line = std::str::from_utf8(&bytes).unwrap(); + assert!( + line.starts_with("vector "), + "measurement (without tags) should ends with space ' '" + ); - let line_protocol = split_line_protocol(string); + let line_protocol = split_line_protocol(line); assert_eq!("vector", line_protocol.0); - assert_eq!("metric_type=logs", line_protocol.1); + assert_eq!("", line_protocol.1, "tags should be empty"); assert_fields( - line_protocol.2.to_string(), + line_protocol.2, ["value=100i", "message=\"hello\""].to_vec(), ); diff --git a/src/sinks/influxdb/mod.rs b/src/sinks/influxdb/mod.rs index c3e40cc66e7e9..0bcfd8cb2b6ed 100644 --- a/src/sinks/influxdb/mod.rs +++ b/src/sinks/influxdb/mod.rs @@ -244,11 +244,13 @@ pub(in crate::sinks) fn influx_line_protocol( } encode_string(measurement, line_protocol); - line_protocol.put_u8(b','); - // Tags + // Tags are optional let unwrapped_tags = tags.unwrap_or_default(); - encode_tags(unwrapped_tags, line_protocol); + if !unwrapped_tags.is_empty() { + line_protocol.put_u8(b','); + encode_tags(unwrapped_tags, line_protocol); + } line_protocol.put_u8(b' '); // Fields @@ -442,12 +444,12 @@ pub mod test_util { // 1542182950000000011 // pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) { - let mut split = line_protocol.splitn(2, ',').collect::>(); - let measurement = split[0]; - - split = split[1].splitn(3, ' ').collect::>(); + let (name, fields) = line_protocol.split_once(' ').unwrap_or_default(); + // tags and timestamp may not be present + let (measurement, tags) = name.split_once(',').unwrap_or((name, "")); + let (fields, ts) = fields.split_once(' ').unwrap_or((fields, "")); - (measurement, split[0], split[1].to_string(), split[2]) + (measurement, tags, fields.to_string(), ts) } fn client() -> reqwest::Client {