Skip to content

Commit

Permalink
chore(influxdb_logs): review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
juvenn committed Apr 5, 2023
1 parent 8e6118b commit 3fab407
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
8 changes: 6 additions & 2 deletions src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,18 +599,22 @@ mod tests {
"vector",
[].to_vec(),
);
// exclude default metric_type tag so to emit empty tags
sink.transformer
.set_except_fields(Some(vec!["metric_type".into()]))
.unwrap();
let mut encoder = sink.build_encoder();

let bytes = encoder.encode_event(event).unwrap();
let line = std::str::from_utf8(&bytes).unwrap();
assert!(line.starts_with("vector "));
assert!(
line.starts_with("vector "),
"measurement (without tags) should ends with space ' '"
);

let line_protocol = split_line_protocol(line);
assert_eq!("vector", line_protocol.0);
assert_eq!("", line_protocol.1);
assert_eq!("", line_protocol.1, "tags should be empty");
assert_fields(
line_protocol.2,
["value=100i", "message=\"hello\""].to_vec(),
Expand Down
16 changes: 7 additions & 9 deletions src/sinks/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,13 @@ pub(in crate::sinks) fn influx_line_protocol(

encode_string(measurement, line_protocol);

// Tags
// Tags are optional
let unwrapped_tags = tags.unwrap_or_default();
if unwrapped_tags.is_empty() {
line_protocol.put_u8(b' ');
} else {
if !unwrapped_tags.is_empty() {
line_protocol.put_u8(b',');
encode_tags(unwrapped_tags, line_protocol);
line_protocol.put_u8(b' ');
}
line_protocol.put_u8(b' ');

// Fields
encode_fields(protocol_version, unwrapped_fields, line_protocol);
Expand Down Expand Up @@ -446,10 +444,10 @@ pub mod test_util {
// 1542182950000000011
//
pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) {
// tags and ts may not be present
let parts: Vec<&str> = line_protocol.splitn(2, ' ').collect();
let (measurement, tags) = parts[0].split_once(',').unwrap_or((parts[0], ""));
let (fields, ts) = parts[1].split_once(' ').unwrap_or((parts[1], ""));
let (name, fields) = line_protocol.split_once(' ').unwrap_or_default();
// tags and timestamp may not be present
let (measurement, tags) = name.split_once(',').unwrap_or((name, ""));
let (fields, ts) = fields.split_once(' ').unwrap_or((fields, ""));

(measurement, tags, fields.to_string(), ts)
}
Expand Down

0 comments on commit 3fab407

Please sign in to comment.