diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 3145c432e50b9..2106629544515 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -599,6 +599,7 @@ mod tests { "vector", [].to_vec(), ); + // exclude default metric_type tag so to emit empty tags sink.transformer .set_except_fields(Some(vec!["metric_type".into()])) .unwrap(); @@ -606,11 +607,14 @@ mod tests { let bytes = encoder.encode_event(event).unwrap(); let line = std::str::from_utf8(&bytes).unwrap(); - assert!(line.starts_with("vector ")); + assert!( + line.starts_with("vector "), + "measurement (without tags) should ends with space ' '" + ); let line_protocol = split_line_protocol(line); assert_eq!("vector", line_protocol.0); - assert_eq!("", line_protocol.1); + assert_eq!("", line_protocol.1, "tags should be empty"); assert_fields( line_protocol.2, ["value=100i", "message=\"hello\""].to_vec(), diff --git a/src/sinks/influxdb/mod.rs b/src/sinks/influxdb/mod.rs index ec205653e9d10..0bcfd8cb2b6ed 100644 --- a/src/sinks/influxdb/mod.rs +++ b/src/sinks/influxdb/mod.rs @@ -245,15 +245,13 @@ pub(in crate::sinks) fn influx_line_protocol( encode_string(measurement, line_protocol); - // Tags + // Tags are optional let unwrapped_tags = tags.unwrap_or_default(); - if unwrapped_tags.is_empty() { - line_protocol.put_u8(b' '); - } else { + if !unwrapped_tags.is_empty() { line_protocol.put_u8(b','); encode_tags(unwrapped_tags, line_protocol); - line_protocol.put_u8(b' '); } + line_protocol.put_u8(b' '); // Fields encode_fields(protocol_version, unwrapped_fields, line_protocol); @@ -446,10 +444,10 @@ pub mod test_util { // 1542182950000000011 // pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) { - // tags and ts may not be present - let parts: Vec<&str> = line_protocol.splitn(2, ' ').collect(); - let (measurement, tags) = parts[0].split_once(',').unwrap_or((parts[0], "")); - let (fields, ts) = parts[1].split_once(' ').unwrap_or((parts[1], "")); + let (name, fields) = line_protocol.split_once(' ').unwrap_or_default(); + // tags and timestamp may not be present + let (measurement, tags) = name.split_once(',').unwrap_or((name, "")); + let (fields, ts) = fields.split_once(' ').unwrap_or((fields, "")); (measurement, tags, fields.to_string(), ts) }