diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index bcf28d4b69..c8b355cd63 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -655,6 +655,20 @@ where } } + /// If the read side can be cheaply drained, do so. Otherwise, close. + pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) { + let _ = self.poll_read_body(cx); + + // If still in Reading::Body, just give up + match self.state.reading { + Reading::Init | Reading::KeepAlive => { + trace!("body drained"); + return; + } + _ => self.close_read(), + } + } + pub fn close_read(&mut self) { self.state.close_read(); } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index ff5bf01832..84ee412c3c 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -186,9 +186,9 @@ where Poll::Ready(Err(_canceled)) => { // user doesn't care about the body // so we should stop reading - trace!("body receiver dropped before eof, closing"); - self.conn.close_read(); - return Poll::Ready(Ok(())); + trace!("body receiver dropped before eof, draining or closing"); + self.conn.poll_drain_or_close_read(cx); + continue; } } match self.conn.poll_read_body(cx) { diff --git a/tests/client.rs b/tests/client.rs index 0c815ba053..f7db372273 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -967,6 +967,7 @@ mod dispatch_impl { use tokio::net::TcpStream; use tokio::runtime::Runtime; + use hyper::body::HttpBody; use hyper::client::connect::{Connected, Connection, HttpConnector}; use hyper::Client; @@ -1574,6 +1575,95 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 1); } + #[tokio::test] + async fn client_keep_alive_eager_when_chunked() { + // If a response body has been read to completion, with completion + // determined by some other factor, like decompression, and thus + // it is in't polled a final time to clear the final 0-len chunk, + // try to eagerly clear it so the connection can still be used. + + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let connector = DebugConnector::new(); + let connects = connector.connects.clone(); + + let client = Client::builder().build(connector); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + //drop(server); + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all( + b"\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + 5\r\n\ + hello\r\n\ + 0\r\n\r\n\ + ", + ) + .expect("write 1"); + let _ = tx1.send(()); + + let n2 = sock.read(&mut buf).expect("read 2"); + assert_ne!(n2, 0, "bytes of second request"); + let second_get = "GET /b HTTP/1.1\r\n"; + assert_eq!(s(&buf[..second_get.len()]), second_get); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 2"); + let _ = tx2.send(()); + }); + + assert_eq!(connects.load(Ordering::SeqCst), 0); + + let rx = rx1.expect("thread panicked"); + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Body::empty()) + .unwrap(); + let fut = client.request(req); + + let mut resp = future::join(fut, rx).map(|r| r.0).await.unwrap(); + assert_eq!(connects.load(Ordering::SeqCst), 1); + assert_eq!(resp.status(), 200); + assert_eq!(resp.headers()["transfer-encoding"], "chunked"); + + // Read the "hello" chunk... + let chunk = resp.body_mut().data().await.unwrap().unwrap(); + assert_eq!(chunk, "hello"); + + // With our prior knowledge, we know that's the end of the body. + // So just drop the body, without polling for the `0\r\n\r\n` end. + drop(resp); + + // sleep real quick to let the threadpool put connection in ready + // state and back into client pool + tokio::time::delay_for(Duration::from_millis(50)).await; + + let rx = rx2.expect("thread panicked"); + let req = Request::builder() + .uri(&*format!("http://{}/b", addr)) + .body(Body::empty()) + .unwrap(); + let fut = client.request(req); + future::join(fut, rx).map(|r| r.0).await.unwrap(); + + assert_eq!( + connects.load(Ordering::SeqCst), + 1, + "second request should still only have 1 connect" + ); + drop(client); + } + #[test] fn connect_proxy_sends_absolute_uri() { let _ = pretty_env_logger::try_init();