diff --git a/http-body-util/src/combinators/frame.rs b/http-body-util/src/combinators/frame.rs index 211fa08..bf8e77a 100644 --- a/http-body-util/src/combinators/frame.rs +++ b/http-body-util/src/combinators/frame.rs @@ -13,6 +13,7 @@ impl<'a, T: Body + Unpin + ?Sized> Future for Frame<'a, T> { type Output = Option, T::Error>>; fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll { + let _ = Pin::new(&mut self.0).poll_progress(ctx)?; Pin::new(&mut self.0).poll_frame(ctx) } } diff --git a/http-body-util/src/combinators/map_err.rs b/http-body-util/src/combinators/map_err.rs index 384cfc5..6edaf74 100644 --- a/http-body-util/src/combinators/map_err.rs +++ b/http-body-util/src/combinators/map_err.rs @@ -67,6 +67,15 @@ where } } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + match this.inner.poll_progress(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(err)) => Poll::Ready(Err((this.f)(err))), + } + } + fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } diff --git a/http-body-util/src/combinators/map_frame.rs b/http-body-util/src/combinators/map_frame.rs index 44886bd..cc55afd 100644 --- a/http-body-util/src/combinators/map_frame.rs +++ b/http-body-util/src/combinators/map_frame.rs @@ -69,6 +69,10 @@ where } } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx) + } + fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } diff --git a/http-body-util/src/combinators/with_trailers.rs b/http-body-util/src/combinators/with_trailers.rs index 383e1ec..dd74623 100644 --- a/http-body-util/src/combinators/with_trailers.rs +++ b/http-body-util/src/combinators/with_trailers.rs @@ -109,6 +109,13 @@ where } } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project().state.project() { + StateProj::PollBody { body, .. } => body.poll_progress(cx), + _ => Poll::Ready(Ok(())), + } + } + #[inline] fn size_hint(&self) -> http_body::SizeHint { match &self.state { diff --git a/http-body-util/src/either.rs b/http-body-util/src/either.rs index 9e0cc43..ec8bccb 100644 --- a/http-body-util/src/either.rs +++ b/http-body-util/src/either.rs @@ -70,6 +70,15 @@ where } } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + EitherProj::Left(left) => left.poll_progress(cx).map(|poll| poll.map_err(Into::into)), + EitherProj::Right(right) => { + right.poll_progress(cx).map(|poll| poll.map_err(Into::into)) + } + } + } + fn is_end_stream(&self) -> bool { match self { Either::Left(left) => left.is_end_stream(), diff --git a/http-body-util/src/limited.rs b/http-body-util/src/limited.rs index c4c5c8b..f4b61a9 100644 --- a/http-body-util/src/limited.rs +++ b/http-body-util/src/limited.rs @@ -64,6 +64,13 @@ where Poll::Ready(res) } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .inner + .poll_progress(cx) + .map(|poll| poll.map_err(Into::into)) + } + fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } diff --git a/http-body/src/lib.rs b/http-body/src/lib.rs index 12077a2..0973143 100644 --- a/http-body/src/lib.rs +++ b/http-body/src/lib.rs @@ -50,6 +50,26 @@ pub trait Body { cx: &mut Context<'_>, ) -> Poll, Self::Error>>>; + /// Attempt to progress the body's state without pulling a new frame. + /// + /// `Body` consumers can use this method to allow the `Body` implementation to continue to + /// perform work even when the consumer is not yet ready to read the next frame. For example, + /// a `Body` implementation could maintain a timer counting down between `poll_frame` calls and + /// report an error from `poll_progress` when time expires. + /// + /// Consumers are *not* required to call this method. A `Body` implementation should not depend + /// on calls to `poll_progress` to occur. + /// + /// An error returned from this method is considered to be equivalent to an error returned from + /// `poll_frame`. + /// + /// Implementations must allow additional calls to this method after it returns + /// `Poll::Ready(Ok(())`. + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let _ = cx; + Poll::Ready(Ok(())) + } + /// Returns `true` when the end of stream has been reached. /// /// An end of stream means that `poll_frame` will return `None`.