Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fluent source): fix ack message format #17407

Merged
merged 2 commits into from
May 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::io::{self, Read};
use std::net::SocketAddr;
use std::time::Duration;
Expand All @@ -9,8 +10,8 @@ use codecs::{BytesDeserializerConfig, StreamDecodingError};
use flate2::read::MultiGzDecoder;
use lookup::lookup_v2::parse_value_path;
use lookup::{metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix};
use rmp_serde::{decode, Deserializer};
use serde::Deserialize;
use rmp_serde::{decode, Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use tokio_util::codec::Decoder;
use vector_config::configurable_component;
Expand Down Expand Up @@ -532,15 +533,18 @@ impl TcpSourceAcker for FluentAcker {
return None;
}

let mut acks = String::new();
let mut buf = Vec::new();
let mut ser = Serializer::new(&mut buf);
let mut ack_map = HashMap::new();

for chunk in self.chunks {
let ack = match ack {
TcpSourceAck::Ack => format!(r#"{{"ack": "{}"}}"#, chunk),
_ => String::from("{}"),
ack_map.clear();
if let TcpSourceAck::Ack = ack {
ack_map.insert("ack", chunk);
};
acks.push_str(&ack);
ack_map.serialize(&mut ser).unwrap();
}
Some(acks.into())
Some(buf.into())
}
}

Expand Down Expand Up @@ -861,7 +865,8 @@ mod tests {
async fn ack_delivered_with_chunk() {
let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await;
assert_eq!(result.unwrap().unwrap(), output.len());
assert!(output.starts_with(b"{\"ack\":"));
let expected: Vec<u8> = vec![0x81, 0xa3, 0x61, 0x63]; // { "ack": ...
assert_eq!(output[..expected.len()], expected);
}

#[tokio::test]
Expand All @@ -875,7 +880,8 @@ mod tests {
async fn ack_failed_with_chunk() {
let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await;
assert_eq!(result.unwrap().unwrap(), output.len());
assert_eq!(output, &b"{}"[..]);
let expected: Vec<u8> = vec![0x80]; // { }
assert_eq!(output, expected);
}

async fn check_acknowledgements(
Expand Down