From 6928536c7cd59cc31bd4b84ce047d4d700d280c6 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 7 Jan 2025 00:00:00 +0000 Subject: [PATCH] refactor(http/retry): prepare `PeekTrailersBody` for hyper upgrade this is a refactor commit, outlined from #3504. this commit is particularly interested in preparing the `PeekTrailersBody` middleware for our upgrade to hyper/http-body 1.0. more specifically: this commit clears up the boundary concerning when it will or will not become inert and delegate to its inner body `B`. currently, a `PeekTrailersBody` is not fully consistent about the conditions in which it will peek the trailers of a response body: the inner body is allowed to yield _either_ (a) **zero** DATA frames, in which case the body will be `.await`'ed and polled until the trailers are obtained, or (b) **one** DATA frame, so long as the inner body immediately yields a trailer. see https://github.com/linkerd/linkerd2/issues/8733. Signed-off-by: katelyn martin --- linkerd/http/retry/src/peek_trailers.rs | 165 +++++++++++++----------- 1 file changed, 89 insertions(+), 76 deletions(-) diff --git a/linkerd/http/retry/src/peek_trailers.rs b/linkerd/http/retry/src/peek_trailers.rs index 7e831a2652..8b4b878091 100644 --- a/linkerd/http/retry/src/peek_trailers.rs +++ b/linkerd/http/retry/src/peek_trailers.rs @@ -11,32 +11,31 @@ use std::{ task::{Context, Poll}, }; -/// An HTTP body that allows inspecting the body's trailers, if a `TRAILERS` -/// frame was the first frame after the initial headers frame. +/// An HTTP body that allows inspecting the body's trailers. /// -/// If the first frame of the body stream was *not* a `TRAILERS` frame, this -/// behaves identically to a normal body. -pub struct PeekTrailersBody { - inner: B, - - /// The first DATA frame received from the inner body, or an error that - /// occurred while polling for data. - /// - /// If this is `None`, then the body has completed without any DATA frames. - first_data: Option>, - - /// The inner body's trailers, if it was terminated by a `TRAILERS` frame - /// after 0-1 DATA frames, or an error if polling for trailers failed. +/// The body's trailers may be peeked with [`PeekTrailersBody::peek_trailers()`]. +/// +/// Trailers may only be peeked if the inner body immediately yields a TRAILERS frame. If the first +/// frame of the body stream was *not* a `TRAILERS` frame, this behaves identically to a normal +/// body. +pub enum PeekTrailersBody { + /// The trailers are not available to be inspected. + Passthru { + /// The inner body. + inner: B, + /// The first DATA frame received from the inner body, or an error that + /// occurred while polling for data. + /// + /// If this is `None`, then the body has completed without any DATA frames. + first_data: Option>, + }, + /// The trailers have been peeked. /// - /// Yes, this is a bit of a complex type, so let's break it down: - /// - the outer `Option` indicates whether any trailers were received by - /// `WithTrailers`; if it's `None`, then we don't *know* if the response - /// had trailers, as it is not yet complete. - /// - the inner `Result` and `Option` are the `Result` and `Option` returned - /// by `HttpBody::trailers` on the inner body. If this is `Ok(None)`, then - /// the body has terminated without trailers --- it is *known* to not have - /// trailers. - trailers: Option, B::Error>>, + /// This variant applies if the inner body's first frame was a `TRAILERS` frame. + Peek { + /// The inner body's trailers. + trailers: Option, B::Error>>, + }, } pub type WithPeekTrailersBody = Either< @@ -50,10 +49,16 @@ pub struct ResponseWithPeekTrailers(pub(crate) S); // === impl WithTrailers === impl PeekTrailersBody { + /// Returns a reference to the trailers, if applicable. + /// + /// See [`PeekTrailersBody`] for more information on when this returns `None`. pub fn peek_trailers(&self) -> Option<&http::HeaderMap> { - self.trailers - .as_ref() - .and_then(|trls| trls.as_ref().ok()?.as_ref()) + match self { + Self::Peek { + trailers: Some(Ok(Some(ref t))), + } => Some(t), + Self::Passthru { .. } | Self::Peek { .. } => None, + } } pub fn map_response(rsp: http::Response) -> WithPeekTrailersBody @@ -87,44 +92,32 @@ impl PeekTrailersBody { B::Data: Send + Unpin, B::Error: Send, { - let (parts, body) = rsp.into_parts(); - let mut body = Self { - inner: body, - first_data: None, - trailers: None, - }; + let (parts, mut body) = rsp.into_parts(); - tracing::debug!("Buffering first data frame"); - if let Some(data) = body.inner.data().await { + tracing::debug!("Buffering first body frame"); + if let first_data @ Some(_) = body.data().await { // The body has data; stop waiting for trailers. - body.first_data = Some(data); - - // Peek to see if there's immediately a trailers frame, and grab - // it if so. Otherwise, bail. - // XXX(eliza): the documentation for the `http::Body` trait says - // that `poll_trailers` should only be called after `poll_data` - // returns `None`...but, in practice, I'm fairly sure that this just - // means that it *will not return `Ready`* until there are no data - // frames left, which is fine for us here, because we `now_or_never` - // it. - body.trailers = body.inner.trailers().now_or_never(); - } else { - // Okay, `poll_data` has returned `None`, so there are no data - // frames left. Let's see if there's trailers... - body.trailers = Some(body.inner.trailers().await); - } - if body.trailers.is_some() { - tracing::debug!("Buffered trailers frame"); + let body = Self::Passthru { + inner: body, + first_data, + }; + return http::Response::from_parts(parts, body); } + // We have confirmed that there are no data frames. Now, await the trailers. + let trailers = body.trailers().await; + tracing::debug!("Buffered trailers frame"); + let body = Self::Peek { + trailers: Some(trailers), + }; http::Response::from_parts(parts, body) } + /// Returns a response with an inert [`PeekTrailersBody`]. fn no_trailers(rsp: http::Response) -> http::Response { - rsp.map(|inner| Self { + rsp.map(|inner| Self::Passthru { inner, first_data: None, - trailers: None, }) } } @@ -142,47 +135,67 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let this = self.get_mut(); - if let Some(first_data) = this.first_data.take() { - return Poll::Ready(Some(first_data)); + match self.get_mut() { + Self::Passthru { + first_data, + ref mut inner, + } => { + // Return the first chunk that was buffered originally. + if let data @ Some(_) = first_data.take() { + return Poll::Ready(data); + } + // ...and then, poll the inner body. + Pin::new(inner).poll_data(cx) + } + // If we have peeked the trailers, we've already polled an empty body. + Self::Peek { .. } => Poll::Ready(None), } - - Pin::new(&mut this.inner).poll_data(cx) } fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - let this = self.get_mut(); - if let Some(trailers) = this.trailers.take() { - return Poll::Ready(trailers); + match self.get_mut() { + Self::Passthru { ref mut inner, .. } => Pin::new(inner).poll_trailers(cx), + Self::Peek { trailers } => { + let trailers = trailers + .take() + .expect("poll_trailers should not be called more than once"); + Poll::Ready(trailers) + } } - - Pin::new(&mut this.inner).poll_trailers(cx) } #[inline] fn is_end_stream(&self) -> bool { - self.first_data.is_none() && self.trailers.is_none() && self.inner.is_end_stream() + match self { + Self::Passthru { inner, first_data } => first_data.is_none() && inner.is_end_stream(), + Self::Peek { trailers: Some(_) } => false, + Self::Peek { trailers: None } => true, + } } #[inline] fn size_hint(&self) -> http_body::SizeHint { use bytes::Buf; - let mut hint = self.inner.size_hint(); - // If we're holding onto a chunk of data, add its length to the inner - // `Body`'s size hint. - if let Some(Ok(chunk)) = self.first_data.as_ref() { - let buffered = chunk.remaining() as u64; - if let Some(upper) = hint.upper() { - hint.set_upper(upper + buffered); + match self { + Self::Passthru { inner, first_data } => { + let mut hint = inner.size_hint(); + // If we're holding onto a chunk of data, add its length to the inner + // `Body`'s size hint. + if let Some(Ok(chunk)) = first_data.as_ref() { + let buffered = chunk.remaining() as u64; + if let Some(upper) = hint.upper() { + hint.set_upper(upper + buffered); + } + hint.set_lower(hint.lower() + buffered); + } + hint } - hint.set_lower(hint.lower() + buffered); + Self::Peek { .. } => http_body::SizeHint::default(), } - - hint } }