Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Body know about incoming Content-Length #1554

Merged
merged 1 commit into from
Jun 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct Body {
enum Kind {
Once(Option<Chunk>),
Chan {
content_length: Option<u64>,
abort_rx: oneshot::Receiver<()>,
rx: mpsc::Receiver<Result<Chunk, ::Error>>,
},
Expand Down Expand Up @@ -85,6 +86,11 @@ impl Body {
/// Useful when wanting to stream chunks from another thread.
#[inline]
pub fn channel() -> (Sender, Body) {
Self::new_channel(None)
}

#[inline]
pub(crate) fn new_channel(content_length: Option<u64>) -> (Sender, Body) {
let (tx, rx) = mpsc::channel(0);
let (abort_tx, abort_rx) = oneshot::channel();

Expand All @@ -93,8 +99,9 @@ impl Body {
tx: tx,
};
let rx = Body::new(Kind::Chan {
abort_rx: abort_rx,
rx: rx,
content_length,
abort_rx,
rx,
});

(tx, rx)
Expand Down Expand Up @@ -188,13 +195,19 @@ impl Body {
fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.kind {
Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
Kind::Chan { ref mut rx, ref mut abort_rx } => {
Kind::Chan { content_length: ref mut len, ref mut rx, ref mut abort_rx } => {
if let Ok(Async::Ready(())) = abort_rx.poll() {
return Err(::Error::new_body_write("body write aborted"));
}

match rx.poll().expect("mpsc cannot error") {
Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
Async::Ready(Some(Ok(chunk))) => {
if let Some(ref mut len) = *len {
debug_assert!(*len >= chunk.len() as u64);
*len = *len - chunk.len() as u64;
}
Ok(Async::Ready(Some(chunk)))
}
Async::Ready(Some(Err(err))) => Err(err),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
Expand Down Expand Up @@ -243,7 +256,7 @@ impl Payload for Body {
fn is_end_stream(&self) -> bool {
match self.kind {
Kind::Once(ref val) => val.is_none(),
Kind::Chan { .. } => false,
Kind::Chan { content_length: len, .. } => len == Some(0),
Kind::H2(ref h2) => h2.is_end_stream(),
Kind::Wrapped(..) => false,
}
Expand All @@ -253,7 +266,7 @@ impl Payload for Body {
match self.kind {
Kind::Once(Some(ref val)) => Some(val.len() as u64),
Kind::Once(None) => Some(0),
Kind::Chan { .. } => None,
Kind::Chan { content_length: len, .. } => len,
Kind::H2(..) => None,
Kind::Wrapped(..) => None,
}
Expand Down
22 changes: 12 additions & 10 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where I: AsyncRead + AsyncWrite,
read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
}

pub fn read_head(&mut self) -> Poll<Option<(MessageHead<T::Incoming>, bool)>, ::Error> {
pub fn read_head(&mut self) -> Poll<Option<(MessageHead<T::Incoming>, Option<BodyLength>)>, ::Error> {
debug_assert!(self.can_read_head());
trace!("Conn::read_head");

Expand Down Expand Up @@ -162,7 +162,6 @@ where I: AsyncRead + AsyncWrite,
continue;
}
};

debug!("incoming body is {}", decoder);

self.state.busy();
Expand All @@ -172,20 +171,23 @@ where I: AsyncRead + AsyncWrite,
}
let wants_keep_alive = msg.keep_alive;
self.state.keep_alive &= wants_keep_alive;
let (body, reading) = if decoder.is_eof() {
(false, Reading::KeepAlive)
} else {
(true, Reading::Body(decoder))
};

let content_length = decoder.content_length();

if let Reading::Closed = self.state.reading {
// actually want an `if not let ...`
} else {
self.state.reading = reading;
self.state.reading = if content_length.is_none() {
Reading::KeepAlive
} else {
Reading::Body(decoder)
};
}
if !body {
if content_length.is_none() {
self.try_keep_alive();
}
return Ok(Async::Ready(Some((head, body))));

return Ok(Async::Ready(Some((head, content_length))));
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/proto/h1/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::{Async, Poll};
use bytes::Bytes;

use super::io::MemRead;
use super::BodyLength;

use self::Kind::{Length, Chunked, Eof};

Expand Down Expand Up @@ -84,6 +85,16 @@ impl Decoder {
}
}

pub fn content_length(&self) -> Option<BodyLength> {
match self.kind {
Length(0) |
Chunked(ChunkedState::End, _) |
Eof(true) => None,
Length(len) => Some(BodyLength::Known(len)),
_ => Some(BodyLength::Unknown),
}
}

pub fn decode<R: MemRead>(&mut self, body: &mut R) -> Poll<Bytes, io::Error> {
trace!("decode; state={:?}", self.kind);
match self.kind {
Expand Down
13 changes: 9 additions & 4 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,14 @@ where
}
// dispatch is ready for a message, try to read one
match self.conn.read_head() {
Ok(Async::Ready(Some((head, has_body)))) => {
let body = if has_body {
let (mut tx, rx) = Body::channel();
Ok(Async::Ready(Some((head, body_len)))) => {
let body = if let Some(body_len) = body_len {
let (mut tx, rx) =
Body::new_channel(if let BodyLength::Known(len) = body_len {
Some(len)
} else {
None
});
let _ = tx.poll_ready(); // register this task if rx is dropped
self.body_tx = Some(tx);
rx
Expand All @@ -201,7 +206,7 @@ where
};
self.dispatch.recv_msg(Ok((head, body)))?;
Ok(Async::Ready(()))
},
}
Ok(Async::Ready(None)) => {
// read eof, conn will start to shutdown automatically
Ok(Async::Ready(()))
Expand Down
57 changes: 57 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,63 @@ mod conn {
res.join(rx).map(|r| r.0).wait().unwrap();
}

#[test]
fn incoming_content_length() {
use hyper::body::Payload;

let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let mut runtime = Runtime::new().unwrap();

let (tx1, rx1) = oneshot::channel();

thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
let mut buf = [0; 4096];
let n = sock.read(&mut buf).expect("read 1");

let expected = "GET / HTTP/1.1\r\n\r\n";
assert_eq!(s(&buf[..n]), expected);

sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello").unwrap();
let _ = tx1.send(());
});

let tcp = tcp_connect(&addr).wait().unwrap();

let (mut client, conn) = conn::handshake(tcp).wait().unwrap();

runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e)));

let req = Request::builder()
.uri("/")
.body(Default::default())
.unwrap();
let res = client.send_request(req).and_then(move |mut res| {
assert_eq!(res.status(), hyper::StatusCode::OK);
assert_eq!(res.body().content_length(), Some(5));
assert!(!res.body().is_end_stream());
loop {
let chunk = res.body_mut().poll_data().unwrap();
match chunk {
Async::Ready(Some(chunk)) => {
assert_eq!(chunk.len(), 5);
break;
}
_ => continue
}
}
res.into_body().concat2()
});
let rx = rx1.expect("thread panicked");

let timeout = Delay::new(Duration::from_millis(200));
let rx = rx.and_then(move |_| timeout.expect("timeout"));
res.join(rx).map(|r| r.0).wait().unwrap();
}

#[test]
fn aborted_body_isnt_completed() {
let _ = ::pretty_env_logger::try_init();
Expand Down