Skip to content

Commit

Permalink
feat(http/retry): unit test suite for PeekTrailersBody<B>
Browse files Browse the repository at this point in the history
`PeekTrailersBody<B>` contains some subtle edge cases related to the
number of DATA frames yielded by the inner body, and how persistent it
will be about polling for TRAILERS frames.

for example, if it yields a DATA frame, it will not await trailers being
available, but it *will* do so if the inner body does not yield a DATA
frame. if a DATA frame is yielded, it will check for a TRAILERS frame,
but it must be immmediately available.

this is all subtle, and particularly subject to change with the upgrade
to http-body 1.0's frame-oriented `Body` interface.

so, this commit introduces a test suite for `PeekTrailersBody<B>`. it
includes assertions to confirm when the peek middleware can and cannot
observe the trailers.

some `TODO(kate)` comments are left where issues exist.

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 22, 2025
1 parent 05e71c9 commit 399ab1d
Showing 1 changed file with 182 additions and 0 deletions.
182 changes: 182 additions & 0 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,185 @@ where
hint
}
}

#[cfg(test)]
mod tests {
use super::PeekTrailersBody;
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body::Body;
use linkerd_error::Error;
use std::{
collections::VecDeque,
ops::Not,
pin::Pin,
task::{Context, Poll},
};

/// A "mock" body.
///
/// This type contains polling results for [`Body`].
#[derive(Default)]
struct MockBody {
data_polls: VecDeque<Poll<Option<Result<Bytes, Error>>>>,
trailer_polls: VecDeque<Poll<Result<Option<http::HeaderMap>, Error>>>,
}

fn data() -> Option<Result<Bytes, Error>> {
let bytes = Bytes::from_static(b"hello");
Some(Ok(bytes))
}

fn trailers() -> Result<Option<http::HeaderMap>, Error> {
let mut trls = HeaderMap::with_capacity(1);
let value = HeaderValue::from_static("shiny");
trls.insert("trailer", value);
Ok(Some(trls))
}

#[tokio::test]
async fn cannot_peek_empty() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let empty = MockBody::default();
let peek = PeekTrailersBody::read_body(empty).await;
assert!(peek.peek_trailers().is_none());
// TODO(kate): this will not return `true`.
// assert!(peek.is_end_stream());
}

#[tokio::test]
async fn peeks_only_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let only_trailers = MockBody::default().then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(only_trailers).await;
assert!(peek.peek_trailers().is_some());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn peeks_one_frame_with_immediate_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_some());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn cannot_peek_one_frame_with_eventual_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Pending)
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_none());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn peeks_one_eventual_frame_with_immediate_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Pending)
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_some());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn cannot_peek_two_frames_with_immediate_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Ready(data()))
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_none());
assert!(peek.is_end_stream().not());
}

// === impl MockBody ===

impl MockBody {
/// Appends a poll outcome for [`Body::poll_data()`].
fn then_yield_data(mut self, poll: Poll<Option<Result<Bytes, Error>>>) -> Self {
self.data_polls.push_back(poll);
self
}

/// Appends a poll outcome for [`Body::poll_trailers()`].
fn then_yield_trailer(
mut self,
poll: Poll<Result<Option<http::HeaderMap>, Error>>,
) -> Self {
self.trailer_polls.push_back(poll);
self
}

/// Schedules a task to be awoken.
fn schedule(cx: &Context<'_>) {
let waker = cx.waker().clone();
tokio::spawn(async move {
waker.wake();
});
}
}

impl Body for MockBody {
type Data = Bytes;
type Error = Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let poll = self
.get_mut()
.data_polls
.pop_front()
.unwrap_or(Poll::Ready(None));
// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}
poll
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let Self {
data_polls,
trailer_polls,
} = self.get_mut();

let poll = if data_polls.is_empty() {
trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None)))
} else {
// If the data frames have not all been yielded, yield `Pending`.
//
// TODO(kate): this arm should panic. it indicates `PeekTrailersBody<B>` isn't
// respecting the contract outlined in
// <https://docs.rs/http-body/0.4.6/http_body/trait.Body.html#tymethod.poll_trailers>.
Poll::Pending
};

// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}

poll
}

fn is_end_stream(&self) -> bool {
self.data_polls.is_empty() && self.trailer_polls.is_empty()
}
}
}

0 comments on commit 399ab1d

Please sign in to comment.