diff --git a/hyper-balance/src/lib.rs b/hyper-balance/src/lib.rs index 9cb1eb5804..936e1d0b92 100644 --- a/hyper-balance/src/lib.rs +++ b/hyper-balance/src/lib.rs @@ -127,6 +127,11 @@ where this.body.poll_trailers(cx) } + + #[inline] + fn size_hint(&self) -> hyper::body::SizeHint { + self.body.size_hint() + } } // ==== PendingUntilEosBody ==== @@ -147,6 +152,7 @@ impl HttpBody for PendingUntilEosBody { type Data = B::Data; type Error = B::Error; + #[inline] fn is_end_stream(&self) -> bool { self.body.is_end_stream() } @@ -181,6 +187,11 @@ impl HttpBody for PendingUntilEosBody { Poll::Ready(ret) } + + #[inline] + fn size_hint(&self) -> hyper::body::SizeHint { + self.body.size_hint() + } } #[cfg(test)] diff --git a/linkerd/app/core/src/retry.rs b/linkerd/app/core/src/retry.rs index fa699594a9..6059b8c0e0 100644 --- a/linkerd/app/core/src/retry.rs +++ b/linkerd/app/core/src/retry.rs @@ -58,55 +58,33 @@ impl retry::NewPolicy for NewRetryPolicy { // === impl Retry === -impl RetryPolicy { - fn can_retry(&self, req: &http::Request) -> bool { - let content_length = |req: &http::Request<_>| { - req.headers() - .get(http::header::CONTENT_LENGTH) - .and_then(|value| value.to_str().ok()?.parse::().ok()) - }; - - // Requests without bodies can always be retried, as we will not need to - // buffer the body. If the request *does* have a body, retry it if and - // only if the request contains a `content-length` header and the - // content length is >= 64 kb. - let has_body = !req.body().is_end_stream(); - if has_body && content_length(req).unwrap_or(usize::MAX) > MAX_BUFFERED_BYTES { - tracing::trace!( - req.has_body = has_body, - req.content_length = ?content_length(req), - "not retryable", - ); - return false; - } - - tracing::trace!( - req.has_body = has_body, - req.content_length = ?content_length(req), - "retryable", - ); - true - } -} - -impl retry::Policy, http::Response, E> for RetryPolicy +impl retry::Policy>, http::Response, E> for RetryPolicy where - A: http_body::Body + Clone, + A: http_body::Body + Unpin, + A::Error: Into, { type Future = future::Ready; fn retry( &self, - req: &http::Request, + req: &http::Request>, result: Result<&http::Response, &E>, ) -> Option { let retryable = match result { Err(_) => false, - Ok(rsp) => classify::Request::from(self.response_classes.clone()) - .classify(req) - .start(rsp) - .eos(None) - .is_failure(), + Ok(rsp) => { + // is the request a failure? + let is_failure = classify::Request::from(self.response_classes.clone()) + .classify(req) + .start(rsp) + .eos(None) + .is_failure(); + // did the body exceed the maximum length limit? + let exceeded_max_len = req.body().is_capped(); + let retryable = is_failure && !exceeded_max_len; + tracing::trace!(is_failure, exceeded_max_len, retryable); + retryable + } }; if !retryable { @@ -123,16 +101,12 @@ where Some(future::ready(self.clone())) } - fn clone_request(&self, req: &http::Request) -> Option> { - let can_retry = self.can_retry(req); - debug_assert!( - can_retry, - "The retry policy attempted to clone an un-retryable request. This is unexpected." - ); - if !can_retry { - return None; - } - + fn clone_request( + &self, + req: &http::Request>, + ) -> Option>> { + // Since the body is already wrapped in a ReplayBody, it must not be obviously too large to + // buffer/clone. let mut clone = http::Request::new(req.body().clone()); *clone.method_mut() = req.method().clone(); *clone.uri_mut() = req.uri().clone(); @@ -160,9 +134,20 @@ where &self, req: http::Request, ) -> Either> { - if self.can_retry(&req) { - return Either::A(req.map(|body| ReplayBody::new(body, MAX_BUFFERED_BYTES))); - } - Either::B(req) + let (head, body) = req.into_parts(); + let replay_body = match ReplayBody::try_new(body, MAX_BUFFERED_BYTES) { + Ok(body) => body, + Err(body) => { + tracing::debug!( + size = body.size_hint().lower(), + "Body is too large to buffer" + ); + return Either::B(http::Request::from_parts(head, body)); + } + }; + + // The body may still be too large to be buffered if the body's length was not known. + // `ReplayBody` handles this gracefully. + Either::A(http::Request::from_parts(head, replay_body)) } } diff --git a/linkerd/app/integration/src/tests/profiles.rs b/linkerd/app/integration/src/tests/profiles.rs index 5038582ea3..fb19ada736 100644 --- a/linkerd/app/integration/src/tests/profiles.rs +++ b/linkerd/app/integration/src/tests/profiles.rs @@ -48,7 +48,8 @@ macro_rules! profile_test { async move { // Read the entire body before responding, so that the // client doesn't fail when writing it out. - let _body = hyper::body::aggregate(req.into_body()).await; + let _body = hyper::body::to_bytes(req.into_body()).await; + tracing::debug!(body = ?_body.as_ref().map(|body| body.len()), "recieved body"); Ok::<_, Error>(if fail { Response::builder() .status(533) @@ -222,6 +223,36 @@ async fn retry_with_small_put_body() { } } +#[tokio::test] +async fn retry_without_content_length() { + profile_test! { + routes: [ + controller::route() + .request_any() + .response_failure(500..600) + .retryable(true) + ], + budget: Some(controller::retry_budget(Duration::from_secs(10), 0.1, 1)), + with_client: |client: client::Client| async move { + let (mut tx, body) = hyper::body::Body::channel(); + let req = client.request_builder("/0.5") + .method("POST") + .body(body) + .unwrap(); + let res = tokio::spawn(async move { client.request_body(req).await }); + tx.send_data(Bytes::from_static(b"hello")) + .await + .expect("the whole body should be read"); + tx.send_data(Bytes::from_static(b"world")) + .await + .expect("the whole body should be read"); + drop(tx); + let res = res.await.unwrap(); + assert_eq!(res.status(), 200); + } + } +} + #[tokio::test] async fn does_not_retry_if_request_does_not_match() { profile_test! { @@ -280,7 +311,9 @@ async fn does_not_retry_if_body_is_too_long() { } #[tokio::test] -async fn does_not_retry_if_body_lacks_known_length() { +async fn does_not_retry_if_streaming_body_exceeds_max_length() { + // TODO(eliza): if we make the max length limit configurable, update this + // test to test the configurable max length limit... profile_test! { routes: [ controller::route() @@ -296,10 +329,21 @@ async fn does_not_retry_if_body_lacks_known_length() { .body(body) .unwrap(); let res = tokio::spawn(async move { client.request_body(req).await }); - let _ = tx.send_data(Bytes::from_static(b"hello")); - let _ = tx.send_data(Bytes::from_static(b"world")); + // send a 32k chunk + tx.send_data(Bytes::from(&[1u8; 32 * 1024][..])) + .await + .expect("the whole body should be read"); + // ...and another one... + tx.send_data(Bytes::from(&[1u8; 32 * 1024][..])) + .await + .expect("the whole body should be read"); + // ...and a third one (exceeding the max length limit) + tx.send_data(Bytes::from(&[1u8; 32 * 1024][..])) + .await + .expect("the whole body should be read"); drop(tx); let res = res.await.unwrap(); + assert_eq!(res.status(), 533); } } diff --git a/linkerd/app/integration/src/tests/transparency.rs b/linkerd/app/integration/src/tests/transparency.rs index 0e7ef8ff5a..8e74e559c0 100644 --- a/linkerd/app/integration/src/tests/transparency.rs +++ b/linkerd/app/integration/src/tests/transparency.rs @@ -843,9 +843,7 @@ macro_rules! http1_tests { let srv = server::http1() .route_fn("/", |req| { - let has_body_header = req.headers().contains_key("transfer-encoding") - || req.headers().contains_key("content-length"); - let status = if has_body_header { + let status = if req.headers().contains_key(http::header::TRANSFER_ENCODING) { StatusCode::BAD_REQUEST } else { StatusCode::OK diff --git a/linkerd/http-box/src/body.rs b/linkerd/http-box/src/body.rs index 4a0d84577a..16f2ff4181 100644 --- a/linkerd/http-box/src/body.rs +++ b/linkerd/http-box/src/body.rs @@ -45,10 +45,12 @@ impl Body for BoxBody { type Data = Data; type Error = Error; + #[inline] fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } + #[inline] fn poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -56,6 +58,7 @@ impl Body for BoxBody { self.as_mut().inner.as_mut().poll_data(cx) } + #[inline] fn poll_trailers( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -63,6 +66,7 @@ impl Body for BoxBody { self.as_mut().inner.as_mut().poll_trailers(cx) } + #[inline] fn size_hint(&self) -> http_body::SizeHint { self.inner.size_hint() } @@ -95,6 +99,7 @@ where type Data = Data; type Error = Error; + #[inline] fn is_end_stream(&self) -> bool { self.0.is_end_stream() } @@ -111,6 +116,7 @@ where })) } + #[inline] fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -118,6 +124,7 @@ where Poll::Ready(futures::ready!(self.project().0.poll_trailers(cx)).map_err(Into::into)) } + #[inline] fn size_hint(&self) -> http_body::SizeHint { self.0.size_hint() } diff --git a/linkerd/http-metrics/src/requests/service.rs b/linkerd/http-metrics/src/requests/service.rs index d300d39490..e2430c405d 100644 --- a/linkerd/http-metrics/src/requests/service.rs +++ b/linkerd/http-metrics/src/requests/service.rs @@ -283,6 +283,7 @@ where self.project().inner.poll_trailers(cx) } + #[inline] fn size_hint(&self) -> http_body::SizeHint { self.inner.size_hint() } @@ -438,6 +439,11 @@ where Poll::Ready(Ok(trls)) } + + #[inline] + fn size_hint(&self) -> http_body::SizeHint { + self.inner.size_hint() + } } #[pinned_drop] diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index f2d6471d48..90ba4e4563 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -4,7 +4,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use http::HeaderMap; -use http_body::Body; +use http_body::{Body, SizeHint}; use linkerd_error::Error; use parking_lot::Mutex; use std::{collections::VecDeque, io::IoSlice, pin::Pin, sync::Arc, task::Context, task::Poll}; @@ -70,6 +70,8 @@ struct SharedState { /// always return `true` from `is_end_stream` even when they don't own the /// shared state. was_empty: bool, + + orig_size_hint: SizeHint, } #[derive(Debug)] @@ -79,7 +81,7 @@ struct BodyState { rest: Option, is_completed: bool, - /// Maxiumum number of bytes to buffer. + /// Maximum number of bytes to buffer. max_bytes: usize, } @@ -88,15 +90,27 @@ struct BodyState { impl ReplayBody { /// Wraps an initial `Body` in a `ReplayBody`. /// - /// In order to prevent unbounded buffering, this takes a maximum number of - /// bytes to buffer as a second parameter. If more than than that number of - /// bytes would be buffered, the buffered data is discarded and any - /// subsequent clones of this body will fail. However, the *currently - /// active* clone of the body is allowed to continue without erroring. It - /// will simply stop buffering any additional data for retries. - pub fn new(body: B, max_bytes: usize) -> Self { - let was_empty = body.is_end_stream(); - Self { + /// In order to prevent unbounded buffering, this takes a maximum number of bytes to buffer as a + /// second parameter. If more than than that number of bytes would be buffered, the buffered + /// data is discarded and any subsequent clones of this body will fail. However, the *currently + /// active* clone of the body is allowed to continue without erroring. It will simply stop + /// buffering any additional data for retries. + /// + /// If the body has a size hint with a lower bound greater than `max_bytes`, the original body + /// is returned in the error variant. + pub fn try_new(body: B, max_bytes: usize) -> Result { + let orig_size_hint = body.size_hint(); + tracing::trace!(body.size_hint = %orig_size_hint.lower(), %max_bytes); + if orig_size_hint.lower() > max_bytes as u64 { + return Err(body); + } + + Ok(Self { + shared: Arc::new(SharedState { + body: Mutex::new(None), + orig_size_hint, + was_empty: body.is_end_stream(), + }), state: Some(BodyState { buf: Default::default(), trailers: None, @@ -104,14 +118,10 @@ impl ReplayBody { is_completed: false, max_bytes: max_bytes + 1, }), - shared: Arc::new(SharedState { - body: Mutex::new(None), - was_empty, - }), // The initial `ReplayBody` has nothing to replay replay_body: false, replay_trailers: false, - } + }) } /// Mutably borrows the body state if this clone currently owns it, @@ -128,6 +138,25 @@ impl ReplayBody { ) -> &'a mut BodyState { state.get_or_insert_with(|| shared.lock().take().expect("missing body state")) } + + /// Returns `true` if the body previously exceeded the configured maximum + /// length limit. + /// + /// If this is true, the body is now empty, and the request should *not* be + /// retried with this body. + pub fn is_capped(&self) -> bool { + self.state + .as_ref() + .map(BodyState::is_capped) + .unwrap_or_else(|| { + self.shared + .body + .lock() + .as_ref() + .expect("if our `state` was `None`, the shared state must be `Some`") + .is_capped() + }) + } } impl Body for ReplayBody @@ -282,31 +311,29 @@ where && is_inner_eos } - fn size_hint(&self) -> http_body::SizeHint { - let mut hint = http_body::SizeHint::default(); - if let Some(ref state) = self.state { - let rem = state.buf.remaining() as u64; - - // Have we read the entire body? If so, the size is exactly the size - // of the buffer. - if state.is_completed { - return http_body::SizeHint::with_exact(rem); - } - - // Otherwise, the size is the size of the current buffer plus the - // size hint returned by the inner body. - let (rest_lower, rest_upper) = state - .rest - .as_ref() - .map(|rest| { - let hint = rest.size_hint(); - (hint.lower(), hint.upper().unwrap_or(0)) - }) - .unwrap_or_default(); - hint.set_lower(rem + rest_lower); - hint.set_upper(rem + rest_upper); + #[inline] + fn size_hint(&self) -> SizeHint { + // If this clone isn't holding the body, return the original size hint. + let state = match self.state.as_ref() { + Some(state) => state, + None => return self.shared.orig_size_hint.clone(), + }; + + // Otherwise, if we're holding the state but have dropped the inner body, the entire body is + // buffered so we know the exact size hint. + let buffered = state.buf.remaining() as u64; + let rest_hint = match state.rest.as_ref() { + Some(rest) => rest.size_hint(), + None => return SizeHint::with_exact(buffered), + }; + + // Otherwise, add the inner body's size hint to the amount of buffered data. An upper limit + // is only set if the inner body has an upper limit. + let mut hint = SizeHint::default(); + hint.set_lower(buffered + rest_hint.lower()); + if let Some(rest_upper) = rest_hint.upper() { + hint.set_upper(buffered + rest_upper); } - hint } } @@ -326,7 +353,7 @@ impl Clone for ReplayBody { impl Drop for ReplayBody { fn drop(&mut self) { - // If this clone owned the shared state, put it back.`s + // If this clone owned the shared state, put it back. if let Some(state) = self.state.take() { *self.shared.body.lock() = Some(state); } @@ -749,7 +776,8 @@ mod tests { fn empty_body_is_always_eos() { // If the initial body was empty, every clone should always return // `true` from `is_end_stream`. - let initial = ReplayBody::new(hyper::Body::empty(), 64 * 1024); + let initial = ReplayBody::try_new(hyper::Body::empty(), 64 * 1024) + .expect("empty body can't be too large"); assert!(initial.is_end_stream()); let replay = initial.clone(); @@ -763,7 +791,8 @@ mod tests { async fn eos_only_when_fully_replayed() { // Test that each clone of a body is not EOS until the data has been // fully replayed. - let mut initial = ReplayBody::new(hyper::Body::from("hello world"), 64 * 1024); + let mut initial = ReplayBody::try_new(hyper::Body::from("hello world"), 64 * 1024) + .expect("body must not be too large"); let mut replay = initial.clone(); body_to_string(&mut initial).await; @@ -807,7 +836,7 @@ mod tests { let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let (mut tx, body) = hyper::Body::channel(); - let mut initial = ReplayBody::new(body, 8); + let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); let mut replay = initial.clone(); // Send enough data to reach the cap @@ -837,7 +866,7 @@ mod tests { let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"); let (mut tx, body) = hyper::Body::channel(); - let mut initial = ReplayBody::new(body, 8); + let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); let mut replay = initial.clone(); // Send enough data to reach the cap @@ -863,6 +892,34 @@ mod tests { assert!(err.is::()) } + #[test] + fn body_too_big() { + let max_size = 8; + let mk_body = + |sz: usize| -> hyper::Body { (0..sz).map(|_| "x").collect::().into() }; + + assert!( + ReplayBody::try_new(hyper::Body::empty(), max_size).is_ok(), + "empty body is not too big" + ); + + assert!( + ReplayBody::try_new(mk_body(max_size), max_size).is_ok(), + "body at maximum capacity is not too big" + ); + + assert!( + ReplayBody::try_new(mk_body(max_size + 1), max_size).is_err(), + "over-sized body is too big" + ); + + let (_sender, body) = hyper::Body::channel(); + assert!( + ReplayBody::try_new(body, max_size).is_ok(), + "body without size hint is not too big" + ); + } + struct Test { tx: Tx, initial: ReplayBody, @@ -875,7 +932,7 @@ mod tests { impl Test { fn new() -> Self { let (tx, body) = hyper::Body::channel(); - let initial = ReplayBody::new(body, 64 * 1024); + let initial = ReplayBody::try_new(body, 64 * 1024).expect("body too large"); let replay = initial.clone(); Self { tx: Tx(tx), diff --git a/linkerd/proxy/http/src/glue.rs b/linkerd/proxy/http/src/glue.rs index f750faf08a..ef64a897f0 100644 --- a/linkerd/proxy/http/src/glue.rs +++ b/linkerd/proxy/http/src/glue.rs @@ -87,6 +87,11 @@ impl HttpBody for UpgradeBody { e }) } + + #[inline] + fn size_hint(&self) -> http_body::SizeHint { + self.body.size_hint() + } } impl Default for UpgradeBody { diff --git a/linkerd/proxy/http/src/orig_proto.rs b/linkerd/proxy/http/src/orig_proto.rs index 57e1ec1db5..4f4a385bed 100644 --- a/linkerd/proxy/http/src/orig_proto.rs +++ b/linkerd/proxy/http/src/orig_proto.rs @@ -218,6 +218,11 @@ impl HttpBody for UpgradeResponseBody { .poll_trailers(cx) .map_err(downgrade_h2_error) } + + #[inline] + fn size_hint(&self) -> http_body::SizeHint { + HttpBody::size_hint(&self.inner) + } } // === impl Downgrade === diff --git a/linkerd/proxy/http/src/retain.rs b/linkerd/proxy/http/src/retain.rs index 29081b2c27..ab9ed7bf6c 100644 --- a/linkerd/proxy/http/src/retain.rs +++ b/linkerd/proxy/http/src/retain.rs @@ -100,10 +100,12 @@ impl http_body::Body for RetainBody { type Data = B::Data; type Error = B::Error; + #[inline] fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } + #[inline] fn poll_data( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -111,6 +113,7 @@ impl http_body::Body for RetainBody { self.project().inner.poll_data(cx) } + #[inline] fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -118,6 +121,7 @@ impl http_body::Body for RetainBody { self.project().inner.poll_trailers(cx) } + #[inline] fn size_hint(&self) -> http_body::SizeHint { self.inner.size_hint() } diff --git a/linkerd/proxy/tap/src/service.rs b/linkerd/proxy/tap/src/service.rs index 52c384c784..fc70d5eee1 100644 --- a/linkerd/proxy/tap/src/service.rs +++ b/linkerd/proxy/tap/src/service.rs @@ -164,6 +164,7 @@ where type Data = B::Data; type Error = B::Error; + #[inline] fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } @@ -198,6 +199,11 @@ where self.as_mut().project().eos(trailers.as_ref()); Poll::Ready(Ok(trailers)) } + + #[inline] + fn size_hint(&self) -> hyper::body::SizeHint { + self.inner.size_hint() + } } impl BodyProj<'_, B, T>