Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(http sink): Adhere to instrumentation spec #14361

Merged
merged 7 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 116 additions & 100 deletions src/sinks/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,11 @@ mod tests {
http::HttpSink,
test::{build_test_server, build_test_server_generic, build_test_server_status},
},
test_util::{components, components::HTTP_SINK_TAGS, next_addr, random_lines_with_stream},
test_util::{
components,
components::{COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS},
next_addr, random_lines_with_stream,
},
};

#[test]
Expand Down Expand Up @@ -577,112 +581,122 @@ mod tests {

#[tokio::test]
async fn retries_on_no_connection() {
let num_lines = 10;
components::assert_sink_compliance(&HTTP_SINK_TAGS, async {
let num_lines = 10;

let (in_addr, sink) = build_sink("").await;
let (in_addr, sink) = build_sink("").await;

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = tokio::spawn(sink.run(events));
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = tokio::spawn(sink.run(events));

// This ordering starts the sender before the server has built
// its accepting socket. The delay below ensures that the sink
// attempts to connect at least once before creating the
// listening socket.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let (rx, trigger, server) = build_test_server(in_addr);
tokio::spawn(server);
// This ordering starts the sender before the server has built
// its accepting socket. The delay below ensures that the sink
// attempts to connect at least once before creating the
// listening socket.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let (rx, trigger, server) = build_test_server(in_addr);
tokio::spawn(server);

pump.await.unwrap().unwrap();
drop(trigger);
pump.await.unwrap().unwrap();
drop(trigger);

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let output_lines = get_received(rx, |parts| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
let output_lines = get_received(rx, |parts| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
})
.await;

assert_eq!(num_lines, output_lines.len());
assert_eq!(input_lines, output_lines);
})
.await;

assert_eq!(num_lines, output_lines.len());
assert_eq!(input_lines, output_lines);
}

#[tokio::test]
async fn retries_on_temporary_error() {
const NUM_LINES: usize = 1000;
const NUM_FAILURES: usize = 2;

let (in_addr, sink) = build_sink("").await;

let counter = Arc::new(atomic::AtomicUsize::new(0));
let in_counter = Arc::clone(&counter);
let (rx, trigger, server) = build_test_server_generic(in_addr, move || {
let count = in_counter.fetch_add(1, atomic::Ordering::Relaxed);
if count < NUM_FAILURES {
// Send a temporary error for the first two responses
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.unwrap_or_else(|_| unreachable!())
} else {
Response::new(Body::empty())
}
});
components::assert_sink_compliance(&HTTP_SINK_TAGS, async {
const NUM_LINES: usize = 1000;
const NUM_FAILURES: usize = 2;

let (in_addr, sink) = build_sink("").await;

let counter = Arc::new(atomic::AtomicUsize::new(0));
let in_counter = Arc::clone(&counter);
let (rx, trigger, server) = build_test_server_generic(in_addr, move || {
let count = in_counter.fetch_add(1, atomic::Ordering::Relaxed);
if count < NUM_FAILURES {
// Send a temporary error for the first two responses
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.unwrap_or_else(|_| unreachable!())
} else {
Response::new(Body::empty())
}
});

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch));
let pump = sink.run(events);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch));
let pump = sink.run(events);

tokio::spawn(server);
tokio::spawn(server);

pump.await.unwrap();
drop(trigger);
pump.await.unwrap();
drop(trigger);

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let output_lines = get_received(rx, |parts| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
})
.await;

let output_lines = get_received(rx, |parts| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
let tries = counter.load(atomic::Ordering::Relaxed);
assert!(tries > NUM_FAILURES);
assert_eq!(NUM_LINES, output_lines.len());
assert_eq!(input_lines, output_lines);
})
.await;

let tries = counter.load(atomic::Ordering::Relaxed);
assert!(tries > NUM_FAILURES);
assert_eq!(NUM_LINES, output_lines.len());
assert_eq!(input_lines, output_lines);
}

#[tokio::test]
async fn fails_on_permanent_error() {
let num_lines = 1000;
components::assert_sink_error(&COMPONENT_ERROR_TAGS, async {
let num_lines = 1000;

let (in_addr, sink) = build_sink("").await;
let (in_addr, sink) = build_sink("").await;

let (rx, trigger, server) = build_test_server_status(in_addr, StatusCode::FORBIDDEN);
let (rx, trigger, server) = build_test_server_status(in_addr, StatusCode::FORBIDDEN);

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (_input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = sink.run(events);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (_input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = sink.run(events);

tokio::spawn(server);
tokio::spawn(server);

pump.await.unwrap();
drop(trigger);
pump.await.unwrap();
drop(trigger);

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));

let output_lines = get_received(rx, |_| unreachable!("There should be no lines")).await;
assert!(output_lines.is_empty());
let output_lines = get_received(rx, |_| unreachable!("There should be no lines")).await;
assert!(output_lines.is_empty());
})
.await;
}

#[tokio::test]
async fn json_compression() {
let num_lines = 1000;
components::assert_sink_compliance(&HTTP_SINK_TAGS, async {
let num_lines = 1000;

let in_addr = next_addr();
let in_addr = next_addr();

let config = r#"
let config = r#"
uri = "http://$IN_ADDR/frames"
compression = "gzip"
encoding.codec = "json"
Expand All @@ -692,44 +706,46 @@ mod tests {
user = "waldo"
password = "hunter2"
"#
.replace("$IN_ADDR", &in_addr.to_string());
let config: HttpSinkConfig = toml::from_str(&config).unwrap();
.replace("$IN_ADDR", &in_addr.to_string());
let config: HttpSinkConfig = toml::from_str(&config).unwrap();

let cx = SinkContext::new_test();
let cx = SinkContext::new_test();

let (sink, _) = config.build(cx).await.unwrap();
let (rx, trigger, server) = build_test_server(in_addr);
let (sink, _) = config.build(cx).await.unwrap();
let (rx, trigger, server) = build_test_server(in_addr);

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = sink.run(events);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = sink.run(events);

tokio::spawn(server);
tokio::spawn(server);

pump.await.unwrap();
drop(trigger);
pump.await.unwrap();
drop(trigger);

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let output_lines = rx
.flat_map(|(parts, body)| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
assert_eq!(
Some(Authorization::basic("waldo", "hunter2")),
parts.headers.typed_get()
);
let output_lines = rx
.flat_map(|(parts, body)| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
assert_eq!(
Some(Authorization::basic("waldo", "hunter2")),
parts.headers.typed_get()
);

let lines: Vec<serde_json::Value> =
serde_json::from_reader(MultiGzDecoder::new(body.reader())).unwrap();
stream::iter(lines)
})
.map(|line| line.get("message").unwrap().as_str().unwrap().to_owned())
.collect::<Vec<_>>()
.await;
let lines: Vec<serde_json::Value> =
serde_json::from_reader(MultiGzDecoder::new(body.reader())).unwrap();
stream::iter(lines)
})
.map(|line| line.get("message").unwrap().as_str().unwrap().to_owned())
.collect::<Vec<_>>()
.await;

assert_eq!(num_lines, output_lines.len());
assert_eq!(input_lines, output_lines);
assert_eq!(num_lines, output_lines.len());
assert_eq!(input_lines, output_lines);
})
.await;
}

async fn get_received(
Expand Down
1 change: 1 addition & 0 deletions src/sinks/util/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
};

pub trait HttpEventEncoder<Output> {
// The encoder handles internal event emission for Error and EventsDropped.
fn encode_event(&mut self, event: Event) -> Option<Output>;
}

Expand Down