Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(body): add trailers to Body channel (#2260) #2387

Merged
merged 8 commits into from
Jan 15, 2021
63 changes: 43 additions & 20 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use std::fmt;

use bytes::Bytes;
use futures_channel::mpsc;
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
use futures_channel::oneshot;
use futures_core::Stream; // for mpsc::Receiver
#[cfg(feature = "stream")]
Expand All @@ -17,14 +15,16 @@ use http_body::{Body as HttpBody, SizeHint};
use super::DecodedLength;
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::{task, watch, Pin, Poll};
use crate::common::Future;
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
use crate::common::{Future, Never};
use crate::common::Never;
use crate::common::{task, watch, Pin, Poll};
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
use crate::proto::h2::ping;

type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
type TrailersSender = oneshot::Sender<HeaderMap>;

/// A stream of `Bytes`, used when receiving bodies.
///
Expand All @@ -43,7 +43,8 @@ enum Kind {
Chan {
content_length: DecodedLength,
want_tx: watch::Sender,
rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
trailers_rx: oneshot::Receiver<HeaderMap>,
},
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
H2 {
Expand Down Expand Up @@ -106,7 +107,8 @@ enum DelayEof {
#[must_use = "Sender does nothing unless sent on"]
pub struct Sender {
want_rx: watch::Receiver,
tx: BodySender,
data_tx: BodySender,
trailers_tx: Option<TrailersSender>,
}

const WANT_PENDING: usize = 1;
Expand Down Expand Up @@ -137,19 +139,25 @@ impl Body {
}

pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
let (tx, rx) = mpsc::channel(0);
let (data_tx, data_rx) = mpsc::channel(0);
let (trailers_tx, trailers_rx) = oneshot::channel();

// If wanter is true, `Sender::poll_ready()` won't becoming ready
// until the `Body` has been polled for data once.
let want = if wanter { WANT_PENDING } else { WANT_READY };

let (want_tx, want_rx) = watch::channel(want);

let tx = Sender { want_rx, tx };
let tx = Sender {
want_rx,
data_tx,
trailers_tx: Some(trailers_tx),
};
let rx = Body::new(Kind::Chan {
content_length,
want_tx,
rx,
data_rx,
trailers_rx,
});

(tx, rx)
Expand Down Expand Up @@ -282,12 +290,13 @@ impl Body {
Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
Kind::Chan {
content_length: ref mut len,
ref mut rx,
ref mut data_rx,
ref mut want_tx,
..
} => {
want_tx.send(WANT_READY);

match ready!(Pin::new(rx).poll_next(cx)?) {
match ready!(Pin::new(data_rx).poll_next(cx)?) {
Some(chunk) => {
len.sub_if(chunk.len() as u64);
Poll::Ready(Some(Ok(chunk)))
Expand Down Expand Up @@ -368,10 +377,15 @@ impl HttpBody for Body {
}
Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
},

Kind::Chan {
ref mut trailers_rx,
..
} => match ready!(Pin::new(trailers_rx).poll(cx)) {
Ok(t) => Poll::Ready(Ok(Some(t))),
Err(_) => Poll::Ready(Ok(None)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seanmonstar If we don't send any trailers, polling the oneshot receiver returns Err(Canceled) so if we return Err(..) in this case, receive process will result in error. That's why I changed this to Ok(None). This way we don't have to send empty trailers. Is it ok though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, this change is the correct behavior. Otherwise, when people upgrade hyper then suddenly in H2 when the trailers are polled they'd error the stream since they weren't sending any before.

},
#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_trailers(cx),

_ => Poll::Ready(Ok(None)),
}
}
Expand Down Expand Up @@ -527,7 +541,7 @@ impl Sender {
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
// Check if the receiver end has tried polling for the body yet
ready!(self.poll_want(cx)?);
self.tx
self.data_tx
.poll_ready(cx)
.map_err(|_| crate::Error::new_closed())
}
Expand All @@ -545,14 +559,23 @@ impl Sender {
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
}

/// Send data on this channel when it is ready.
/// Send data on data channel when it is ready.
pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
self.ready().await?;
self.tx
self.data_tx
.try_send(Ok(chunk))
.map_err(|_| crate::Error::new_closed())
}

/// Send trailers on trailers channel.
pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
let tx = match self.trailers_tx.take() {
Some(tx) => tx,
None => return Err(crate::Error::new_closed()),
};
tx.send(trailers).map_err(|_| crate::Error::new_closed())
}

/// Try to send data on this channel.
///
/// # Errors
Expand All @@ -566,23 +589,23 @@ impl Sender {
/// that doesn't have an async context. If in an async context, prefer
/// `send_data()` instead.
pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
self.tx
self.data_tx
.try_send(Ok(chunk))
.map_err(|err| err.into_inner().expect("just sent Ok"))
}

/// Aborts the body in an abnormal fashion.
pub fn abort(self) {
let _ = self
.tx
.data_tx
// clone so the send works even if buffer is full
.clone()
.try_send(Err(crate::Error::new_body_write_aborted()));
}

#[cfg(feature = "http1")]
pub(crate) fn send_error(&mut self, err: crate::Error) {
let _ = self.tx.try_send(Err(err));
let _ = self.data_tx.try_send(Err(err));
}
}

Expand Down Expand Up @@ -628,7 +651,7 @@ mod tests {

assert_eq!(
mem::size_of::<Sender>(),
mem::size_of::<usize>() * 4,
mem::size_of::<usize>() * 5,
aeryz marked this conversation as resolved.
Show resolved Hide resolved
"Sender"
);

Expand Down