Skip to content

Commit

Permalink
enhancement(http sink): Adhere to instrumentation spec (#14361)
Browse files Browse the repository at this point in the history
  • Loading branch information
neuronull authored Sep 12, 2022
1 parent f872243 commit 0056ffe
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 100 deletions.
216 changes: 116 additions & 100 deletions src/sinks/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,11 @@ mod tests {
http::HttpSink,
test::{build_test_server, build_test_server_generic, build_test_server_status},
},
test_util::{components, components::HTTP_SINK_TAGS, next_addr, random_lines_with_stream},
test_util::{
components,
components::{COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS},
next_addr, random_lines_with_stream,
},
};

#[test]
Expand Down Expand Up @@ -577,112 +581,122 @@ mod tests {

#[tokio::test]
async fn retries_on_no_connection() {
let num_lines = 10;
components::assert_sink_compliance(&HTTP_SINK_TAGS, async {
let num_lines = 10;

let (in_addr, sink) = build_sink("").await;
let (in_addr, sink) = build_sink("").await;

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = tokio::spawn(sink.run(events));
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = tokio::spawn(sink.run(events));

// This ordering starts the sender before the server has built
// its accepting socket. The delay below ensures that the sink
// attempts to connect at least once before creating the
// listening socket.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let (rx, trigger, server) = build_test_server(in_addr);
tokio::spawn(server);
// This ordering starts the sender before the server has built
// its accepting socket. The delay below ensures that the sink
// attempts to connect at least once before creating the
// listening socket.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let (rx, trigger, server) = build_test_server(in_addr);
tokio::spawn(server);

pump.await.unwrap().unwrap();
drop(trigger);
pump.await.unwrap().unwrap();
drop(trigger);

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let output_lines = get_received(rx, |parts| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
let output_lines = get_received(rx, |parts| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
})
.await;

assert_eq!(num_lines, output_lines.len());
assert_eq!(input_lines, output_lines);
})
.await;

assert_eq!(num_lines, output_lines.len());
assert_eq!(input_lines, output_lines);
}

#[tokio::test]
async fn retries_on_temporary_error() {
const NUM_LINES: usize = 1000;
const NUM_FAILURES: usize = 2;

let (in_addr, sink) = build_sink("").await;

let counter = Arc::new(atomic::AtomicUsize::new(0));
let in_counter = Arc::clone(&counter);
let (rx, trigger, server) = build_test_server_generic(in_addr, move || {
let count = in_counter.fetch_add(1, atomic::Ordering::Relaxed);
if count < NUM_FAILURES {
// Send a temporary error for the first two responses
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.unwrap_or_else(|_| unreachable!())
} else {
Response::new(Body::empty())
}
});
components::assert_sink_compliance(&HTTP_SINK_TAGS, async {
const NUM_LINES: usize = 1000;
const NUM_FAILURES: usize = 2;

let (in_addr, sink) = build_sink("").await;

let counter = Arc::new(atomic::AtomicUsize::new(0));
let in_counter = Arc::clone(&counter);
let (rx, trigger, server) = build_test_server_generic(in_addr, move || {
let count = in_counter.fetch_add(1, atomic::Ordering::Relaxed);
if count < NUM_FAILURES {
// Send a temporary error for the first two responses
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.unwrap_or_else(|_| unreachable!())
} else {
Response::new(Body::empty())
}
});

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch));
let pump = sink.run(events);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch));
let pump = sink.run(events);

tokio::spawn(server);
tokio::spawn(server);

pump.await.unwrap();
drop(trigger);
pump.await.unwrap();
drop(trigger);

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let output_lines = get_received(rx, |parts| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
})
.await;

let output_lines = get_received(rx, |parts| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
let tries = counter.load(atomic::Ordering::Relaxed);
assert!(tries > NUM_FAILURES);
assert_eq!(NUM_LINES, output_lines.len());
assert_eq!(input_lines, output_lines);
})
.await;

let tries = counter.load(atomic::Ordering::Relaxed);
assert!(tries > NUM_FAILURES);
assert_eq!(NUM_LINES, output_lines.len());
assert_eq!(input_lines, output_lines);
}

#[tokio::test]
async fn fails_on_permanent_error() {
let num_lines = 1000;
components::assert_sink_error(&COMPONENT_ERROR_TAGS, async {
let num_lines = 1000;

let (in_addr, sink) = build_sink("").await;
let (in_addr, sink) = build_sink("").await;

let (rx, trigger, server) = build_test_server_status(in_addr, StatusCode::FORBIDDEN);
let (rx, trigger, server) = build_test_server_status(in_addr, StatusCode::FORBIDDEN);

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (_input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = sink.run(events);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (_input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = sink.run(events);

tokio::spawn(server);
tokio::spawn(server);

pump.await.unwrap();
drop(trigger);
pump.await.unwrap();
drop(trigger);

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));

let output_lines = get_received(rx, |_| unreachable!("There should be no lines")).await;
assert!(output_lines.is_empty());
let output_lines = get_received(rx, |_| unreachable!("There should be no lines")).await;
assert!(output_lines.is_empty());
})
.await;
}

#[tokio::test]
async fn json_compression() {
let num_lines = 1000;
components::assert_sink_compliance(&HTTP_SINK_TAGS, async {
let num_lines = 1000;

let in_addr = next_addr();
let in_addr = next_addr();

let config = r#"
let config = r#"
uri = "http://$IN_ADDR/frames"
compression = "gzip"
encoding.codec = "json"
Expand All @@ -692,44 +706,46 @@ mod tests {
user = "waldo"
password = "hunter2"
"#
.replace("$IN_ADDR", &in_addr.to_string());
let config: HttpSinkConfig = toml::from_str(&config).unwrap();
.replace("$IN_ADDR", &in_addr.to_string());
let config: HttpSinkConfig = toml::from_str(&config).unwrap();

let cx = SinkContext::new_test();
let cx = SinkContext::new_test();

let (sink, _) = config.build(cx).await.unwrap();
let (rx, trigger, server) = build_test_server(in_addr);
let (sink, _) = config.build(cx).await.unwrap();
let (rx, trigger, server) = build_test_server(in_addr);

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = sink.run(events);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch));
let pump = sink.run(events);

tokio::spawn(server);
tokio::spawn(server);

pump.await.unwrap();
drop(trigger);
pump.await.unwrap();
drop(trigger);

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let output_lines = rx
.flat_map(|(parts, body)| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
assert_eq!(
Some(Authorization::basic("waldo", "hunter2")),
parts.headers.typed_get()
);
let output_lines = rx
.flat_map(|(parts, body)| {
assert_eq!(Method::POST, parts.method);
assert_eq!("/frames", parts.uri.path());
assert_eq!(
Some(Authorization::basic("waldo", "hunter2")),
parts.headers.typed_get()
);

let lines: Vec<serde_json::Value> =
serde_json::from_reader(MultiGzDecoder::new(body.reader())).unwrap();
stream::iter(lines)
})
.map(|line| line.get("message").unwrap().as_str().unwrap().to_owned())
.collect::<Vec<_>>()
.await;
let lines: Vec<serde_json::Value> =
serde_json::from_reader(MultiGzDecoder::new(body.reader())).unwrap();
stream::iter(lines)
})
.map(|line| line.get("message").unwrap().as_str().unwrap().to_owned())
.collect::<Vec<_>>()
.await;

assert_eq!(num_lines, output_lines.len());
assert_eq!(input_lines, output_lines);
assert_eq!(num_lines, output_lines.len());
assert_eq!(input_lines, output_lines);
})
.await;
}

async fn get_received(
Expand Down
1 change: 1 addition & 0 deletions src/sinks/util/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
};

pub trait HttpEventEncoder<Output> {
// The encoder handles internal event emission for Error and EventsDropped.
fn encode_event(&mut self, event: Event) -> Option<Output>;
}

Expand Down

0 comments on commit 0056ffe

Please sign in to comment.