From 214344a8cbdf0150d37f242f538fe1ac6d82a2e9 Mon Sep 17 00:00:00 2001 From: aeryz Date: Wed, 6 Jan 2021 18:53:08 +0300 Subject: [PATCH 1/4] feat(body): add trailers to Body channel (#2260) --- benches/end_to_end.rs | 5 +++- src/body/body.rs | 60 +++++++++++++++++++++++++++++++------------ src/client/conn.rs | 5 +++- 3 files changed, 51 insertions(+), 19 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 6376697afc..fde3c670cc 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use futures_util::future::join_all; use hyper::client::HttpConnector; -use hyper::{body::HttpBody as _, Body, Method, Request, Response, Server}; +use hyper::{body::HttpBody as _, Body, HeaderMap, Method, Request, Response, Server}; // HTTP1 @@ -313,6 +313,9 @@ impl Opts { for _ in 0..chunk_cnt { tx.send_data(chunk.into()).await.expect("send_data"); } + tx.send_trailers(HeaderMap::new()) + .await + .expect("send_trailers"); }); body } else { diff --git a/src/body/body.rs b/src/body/body.rs index 4a1d6210bc..c853b24ce9 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -5,8 +5,6 @@ use std::fmt; use bytes::Bytes; use futures_channel::mpsc; -#[cfg(any(feature = "http1", feature = "http2"))] -#[cfg(feature = "client")] use futures_channel::oneshot; use futures_core::Stream; // for mpsc::Receiver #[cfg(feature = "stream")] @@ -17,14 +15,16 @@ use http_body::{Body as HttpBody, SizeHint}; use super::DecodedLength; #[cfg(feature = "stream")] use crate::common::sync_wrapper::SyncWrapper; -use crate::common::{task, watch, Pin, Poll}; +use crate::common::Future; #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "client")] -use crate::common::{Future, Never}; +use crate::common::Never; +use crate::common::{task, watch, Pin, Poll}; #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] use crate::proto::h2::ping; type BodySender = mpsc::Sender>; +type TrailersSender = oneshot::Sender; /// A stream of `Bytes`, used when receiving bodies. /// @@ -43,7 +43,8 @@ enum Kind { Chan { content_length: DecodedLength, want_tx: watch::Sender, - rx: mpsc::Receiver>, + data_rx: mpsc::Receiver>, + trailers_rx: oneshot::Receiver, }, #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] H2 { @@ -104,7 +105,8 @@ enum DelayEof { #[must_use = "Sender does nothing unless sent on"] pub struct Sender { want_rx: watch::Receiver, - tx: BodySender, + data_tx: BodySender, + trailers_tx: Option, } const WANT_PENDING: usize = 1; @@ -135,7 +137,8 @@ impl Body { } pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) { - let (tx, rx) = mpsc::channel(0); + let (data_tx, data_rx) = mpsc::channel(0); + let (trailers_tx, trailers_rx) = oneshot::channel(); // If wanter is true, `Sender::poll_ready()` won't becoming ready // until the `Body` has been polled for data once. @@ -143,11 +146,16 @@ impl Body { let (want_tx, want_rx) = watch::channel(want); - let tx = Sender { want_rx, tx }; + let tx = Sender { + want_rx, + data_tx, + trailers_tx: Some(trailers_tx), + }; let rx = Body::new(Kind::Chan { content_length, want_tx, - rx, + data_rx, + trailers_rx, }); (tx, rx) @@ -265,12 +273,13 @@ impl Body { Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), Kind::Chan { content_length: ref mut len, - ref mut rx, + ref mut data_rx, ref mut want_tx, + .. } => { want_tx.send(WANT_READY); - match ready!(Pin::new(rx).poll_next(cx)?) { + match ready!(Pin::new(data_rx).poll_next(cx)?) { Some(chunk) => { len.sub_if(chunk.len() as u64); Poll::Ready(Some(Ok(chunk))) @@ -348,6 +357,13 @@ impl HttpBody for Body { } Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), }, + Kind::Chan { + ref mut trailers_rx, + .. + } => match ready!(Pin::new(trailers_rx).poll(cx)) { + Ok(t) => Poll::Ready(Ok(Some(t))), + Err(_) => Poll::Ready(Err(crate::Error::new_closed())), + }, _ => Poll::Ready(Ok(None)), } } @@ -499,7 +515,7 @@ impl Sender { pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { // Check if the receiver end has tried polling for the body yet ready!(self.poll_want(cx)?); - self.tx + self.data_tx .poll_ready(cx) .map_err(|_| crate::Error::new_closed()) } @@ -520,11 +536,21 @@ impl Sender { /// Send data on this channel when it is ready. pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { self.ready().await?; - self.tx + self.data_tx .try_send(Ok(chunk)) .map_err(|_| crate::Error::new_closed()) } + /// Send trailers on this channel when it is ready. + pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { + self.ready().await?; + let tx = match self.trailers_tx.take() { + Some(tx) => tx, + None => return Err(crate::Error::new_closed()), + }; + tx.send(trailers).map_err(|_| crate::Error::new_closed()) + } + /// Try to send data on this channel. /// /// # Errors @@ -538,7 +564,7 @@ impl Sender { /// that doesn't have an async context. If in an async context, prefer /// `send_data()` instead. pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { - self.tx + self.data_tx .try_send(Ok(chunk)) .map_err(|err| err.into_inner().expect("just sent Ok")) } @@ -546,7 +572,7 @@ impl Sender { /// Aborts the body in an abnormal fashion. pub fn abort(self) { let _ = self - .tx + .data_tx // clone so the send works even if buffer is full .clone() .try_send(Err(crate::Error::new_body_write_aborted())); @@ -554,7 +580,7 @@ impl Sender { #[cfg(feature = "http1")] pub(crate) fn send_error(&mut self, err: crate::Error) { - let _ = self.tx.try_send(Err(err)); + let _ = self.data_tx.try_send(Err(err)); } } @@ -600,7 +626,7 @@ mod tests { assert_eq!( mem::size_of::(), - mem::size_of::() * 4, + mem::size_of::() * 5, "Sender" ); diff --git a/src/client/conn.rs b/src/client/conn.rs index 23b548dacd..64f7f4803a 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -63,7 +63,10 @@ use tower_service::Service; use super::dispatch; use crate::body::HttpBody; -use crate::common::{task, exec::{BoxSendFuture, Exec}, Future, Pin, Poll}; +use crate::common::{ + exec::{BoxSendFuture, Exec}, + task, Future, Pin, Poll, +}; use crate::proto; use crate::rt::Executor; #[cfg(feature = "http1")] From 30afdf8e4f44e6800e74ee1e9e6010ba4463208a Mon Sep 17 00:00:00 2001 From: aeryz Date: Sun, 10 Jan 2021 16:23:01 +0300 Subject: [PATCH 2/4] feat(body): make 'send_trailers' sync --- src/body/body.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/body/body.rs b/src/body/body.rs index c9ad52bc51..c8019a5806 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -559,7 +559,7 @@ impl Sender { futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await } - /// Send data on this channel when it is ready. + /// Send data on data channel when it is ready. pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { self.ready().await?; self.data_tx @@ -567,9 +567,8 @@ impl Sender { .map_err(|_| crate::Error::new_closed()) } - /// Send trailers on this channel when it is ready. - pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { - self.ready().await?; + /// Send trailers on trailers channel. + pub fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { let tx = match self.trailers_tx.take() { Some(tx) => tx, None => return Err(crate::Error::new_closed()), From 30c71328f933c68de9106795c9201417b1d68014 Mon Sep 17 00:00:00 2001 From: aeryz Date: Sun, 10 Jan 2021 16:36:02 +0300 Subject: [PATCH 3/4] fix(benches): remove await --- benches/end_to_end.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index fde3c670cc..53c6188dfb 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -313,9 +313,7 @@ impl Opts { for _ in 0..chunk_cnt { tx.send_data(chunk.into()).await.expect("send_data"); } - tx.send_trailers(HeaderMap::new()) - .await - .expect("send_trailers"); + tx.send_trailers(HeaderMap::new()).expect("send_trailers"); }); body } else { From dda8b239e242d0289cf375bd2c51853ab083b834 Mon Sep 17 00:00:00 2001 From: aeryz Date: Fri, 15 Jan 2021 08:58:07 +0300 Subject: [PATCH 4/4] feat(body): return None if poll on trailers' receiver returns 'Canceled' --- benches/end_to_end.rs | 3 +-- src/body/body.rs | 4 ++-- src/client/conn.rs | 5 +---- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 53c6188dfb..6376697afc 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use futures_util::future::join_all; use hyper::client::HttpConnector; -use hyper::{body::HttpBody as _, Body, HeaderMap, Method, Request, Response, Server}; +use hyper::{body::HttpBody as _, Body, Method, Request, Response, Server}; // HTTP1 @@ -313,7 +313,6 @@ impl Opts { for _ in 0..chunk_cnt { tx.send_data(chunk.into()).await.expect("send_data"); } - tx.send_trailers(HeaderMap::new()).expect("send_trailers"); }); body } else { diff --git a/src/body/body.rs b/src/body/body.rs index c8019a5806..9c199fd2c8 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -382,7 +382,7 @@ impl HttpBody for Body { .. } => match ready!(Pin::new(trailers_rx).poll(cx)) { Ok(t) => Poll::Ready(Ok(Some(t))), - Err(_) => Poll::Ready(Err(crate::Error::new_closed())), + Err(_) => Poll::Ready(Ok(None)), }, #[cfg(feature = "ffi")] Kind::Ffi(ref mut body) => body.poll_trailers(cx), @@ -568,7 +568,7 @@ impl Sender { } /// Send trailers on trailers channel. - pub fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { + pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { let tx = match self.trailers_tx.take() { Some(tx) => tx, None => return Err(crate::Error::new_closed()), diff --git a/src/client/conn.rs b/src/client/conn.rs index 64f7f4803a..23b548dacd 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -63,10 +63,7 @@ use tower_service::Service; use super::dispatch; use crate::body::HttpBody; -use crate::common::{ - exec::{BoxSendFuture, Exec}, - task, Future, Pin, Poll, -}; +use crate::common::{task, exec::{BoxSendFuture, Exec}, Future, Pin, Poll}; use crate::proto; use crate::rt::Executor; #[cfg(feature = "http1")]