Skip to content

Commit

Permalink
fix(influxdb_logs): encode influx line when no tags present
Browse files Browse the repository at this point in the history
  • Loading branch information
juvenn committed Apr 1, 2023
1 parent 00c0316 commit 0d804dd
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 15 deletions.
22 changes: 15 additions & 7 deletions src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,24 +592,32 @@ mod tests {
event.as_mut_log().insert("value", 100);
event.as_mut_log().insert("timestamp", ts());

let sink = create_sink(
let mut sink = create_sink(
"http://localhost:9999",
"my-token",
ProtocolVersion::V2,
"vector",
["metric_type"].to_vec(),
[].to_vec(),
);
sink.transformer
.set_except_fields(Some(vec!["metric_type".into()]))
.unwrap();
let mut encoder = sink.build_encoder();

let bytes = encoder.encode_event(event).unwrap();
let string = std::str::from_utf8(&bytes).unwrap();
let line = std::str::from_utf8(&bytes).unwrap();
assert!(line.starts_with("vector "));

let line_protocol = split_line_protocol(string);
let line_protocol = split_line_protocol(line);
assert_eq!("vector", line_protocol.0);
assert_eq!("metric_type=logs", line_protocol.1);
assert_eq!("", line_protocol.1);
assert_fields(
line_protocol.2.to_string(),
["value=100i", "message=\"hello\""].to_vec(),
line_protocol.2,
[
"value=100i",
"message=\"hello\""
]
.to_vec(),
);

assert_eq!("1542182950000000011\n", line_protocol.3);
Expand Down
20 changes: 12 additions & 8 deletions src/sinks/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,16 @@ pub(in crate::sinks) fn influx_line_protocol(
}

encode_string(measurement, line_protocol);
line_protocol.put_u8(b',');

// Tags
let unwrapped_tags = tags.unwrap_or_default();
encode_tags(unwrapped_tags, line_protocol);
line_protocol.put_u8(b' ');
if unwrapped_tags.is_empty() {
line_protocol.put_u8(b' ');
} else {
line_protocol.put_u8(b',');
encode_tags(unwrapped_tags, line_protocol);
line_protocol.put_u8(b' ');
}

// Fields
encode_fields(protocol_version, unwrapped_fields, line_protocol);
Expand Down Expand Up @@ -442,12 +446,12 @@ pub mod test_util {
// 1542182950000000011
//
pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) {
let mut split = line_protocol.splitn(2, ',').collect::<Vec<&str>>();
let measurement = split[0];

split = split[1].splitn(3, ' ').collect::<Vec<&str>>();
// tags and ts may not be present
let parts: Vec<&str> = line_protocol.splitn(2, " ").collect();
let (measurement, tags) = parts[0].split_once(",").unwrap_or((parts[0], ""));
let (fields, ts) = parts[1].split_once(" ").unwrap_or((parts[1], ""));

(measurement, split[0], split[1].to_string(), split[2])
(measurement, tags, fields.to_string(), ts)
}

fn client() -> reqwest::Client {
Expand Down

0 comments on commit 0d804dd

Please sign in to comment.