Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

H3 http body #734

Merged
merged 17 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions interop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ anyhow = "1.0.22"
bytes = "0.5.2"
futures = "0.3.1"
http = "0.2"
http-body = "0.3"
hyper = "0.13"
hyper-rustls = "0.20"
lazy_static = "1"
Expand Down
14 changes: 7 additions & 7 deletions interop/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use structopt::StructOpt;
use tracing::{debug, error, info, info_span, warn};
use tracing_futures::Instrument as _;

use quinn_h3::Body;

#[derive(StructOpt, Debug)]
#[structopt(name = "interop")]
struct Opt {
Expand Down Expand Up @@ -902,16 +904,14 @@ fn build_result(
}

async fn h3_get(conn: &quinn_h3::client::Connection, uri: &http::Uri) -> Result<usize> {
let (response, _) = conn.send_request(http::Request::get(uri).body(())?).await?;
let (request, response) = conn.send_request(http::Request::get(uri).body(Body::from(()))?);
request.await?;

let (resp, mut recv_body) = response.await?;
let mut resp = response.await?;
debug!("resp: {:?}", resp);
let mut total_len = 0usize;
while let Some(d) = recv_body.data().await {
total_len += d?.len()
}
let body = resp.body_mut().read_to_end().await?;

Ok(total_len)
Ok(body.len())
}

async fn hq_get(stream: (quinn::SendStream, quinn::RecvStream), path: &str) -> Result<Vec<u8>> {
Expand Down
26 changes: 11 additions & 15 deletions interop/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{

use anyhow::{anyhow, bail, Context as _, Result};
use bytes::Bytes;
use futures::{ready, AsyncReadExt, Future, StreamExt, TryFutureExt};
use futures::{ready, Future, StreamExt, TryFutureExt};
use http::{Response, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use structopt::{self, StructOpt};
Expand All @@ -21,7 +21,7 @@ use tracing::{error, info, info_span};
use tracing_futures::Instrument as _;

use quinn::SendStream;
use quinn_h3::{self, server::RecvRequest};
use quinn_h3::{self, server::RecvRequest, Body};
use sync::Arc;

#[derive(StructOpt, Debug, Clone)]
Expand Down Expand Up @@ -140,17 +140,13 @@ async fn h3_handle_connection(connecting: quinn::Connecting) -> Result<()> {
}

async fn h3_handle_request(recv_request: RecvRequest) -> Result<()> {
let (request, mut recv_body, sender) = recv_request.await?;
let (mut request, sender) = recv_request.await?;
println!("received request: {:?}", request);

let mut body = Vec::with_capacity(1024);
recv_body
.read_to_end(&mut body)
.await
.map_err(|e| anyhow!("failed to send response headers: {:?}", e))?;

let body = request.body_mut().read_to_end().await?;
println!("received body: {}", String::from_utf8_lossy(&body));
if let Some(trailers) = recv_body.trailers().await {

if let Some(trailers) = request.body_mut().trailers().await? {
println!("received trailers: {:?}", trailers);
}

Expand All @@ -165,11 +161,11 @@ async fn h3_handle_request(recv_request: RecvRequest) -> Result<()> {
Ok(())
}

async fn h3_home(sender: quinn_h3::server::Sender) -> Result<()> {
async fn h3_home(mut sender: quinn_h3::server::Sender) -> Result<()> {
let response = Response::builder()
.status(StatusCode::OK)
.header("server", VERSION)
.body(HOME)
.body(Body::from(HOME))
.expect("failed to build response");
sender
.send_response(response)
Expand All @@ -178,12 +174,12 @@ async fn h3_home(sender: quinn_h3::server::Sender) -> Result<()> {
Ok(())
}

async fn h3_payload(sender: quinn_h3::server::Sender, len: usize) -> Result<()> {
async fn h3_payload(mut sender: quinn_h3::server::Sender, len: usize) -> Result<()> {
if len > 1_000_000_000 {
let response = Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("server", VERSION)
.body(Bytes::from(format!("requested {}: too large", len)))
.body(Body::from(format!("requested {}: too large", len).as_ref()))
.expect("failed to build response");
sender.send_response(response).await?;
return Ok(());
Expand All @@ -195,7 +191,7 @@ async fn h3_payload(sender: quinn_h3::server::Sender, len: usize) -> Result<()>
let response = Response::builder()
.status(StatusCode::OK)
.header("server", VERSION)
.body(Bytes::from(buf))
.body(Body::from(Bytes::from(buf)))
.expect("failed to build response");

sender
Expand Down
2 changes: 2 additions & 0 deletions quinn-h3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ err-derive = "0.2.3"
futures = "0.3.1"
futures-util = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.3"
lazy_static = "1"
pin-project = "0.4"
quinn-proto = { path = "../quinn-proto", version = "0.6.0" }
quinn = { path = "../quinn", version = "0.6.0", features = ["tls-rustls"] }
rustls = { version = "0.17", features = ["quic"] }
Expand Down
71 changes: 51 additions & 20 deletions quinn-h3/benches/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,41 @@
#![allow(dead_code)]
use std::{
cmp,
net::{IpAddr, Ipv6Addr, SocketAddr, UdpSocket},
pin::Pin,
sync::Arc,
task::{Context, Poll},
thread,
};

use futures::{channel::oneshot, Future};
use tokio::{
io::AsyncWriteExt as _,
runtime::{Builder, Runtime},
};
use tokio::runtime::{Builder, Runtime};
use tracing::{error_span, span, Level};
use tracing_futures::Instrument as _;

use bytes::Bytes;
use http::HeaderMap;
use http_body::Body as HttpBody;
use quinn::{ClientConfigBuilder, ServerConfigBuilder};
use quinn_h3::{
self, client,
server::{self, IncomingConnection},
BodyWriter, Settings,
Error, Settings,
};

pub struct Context {
pub struct Bench {
server_config: server::Builder,
client_config: client::Builder,
stop_server: Option<oneshot::Sender<()>>,
}

impl Default for Context {
impl Default for Bench {
fn default() -> Self {
Self::with_settings(Settings::new())
}
}

impl Context {
impl Bench {
pub fn with_settings(settings: Settings) -> Self {
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap();
let key = quinn::PrivateKey::from_der(&cert.serialize_private_key_der()).unwrap();
Expand Down Expand Up @@ -113,22 +116,50 @@ impl Context {
}
}

pub async fn send_body(mut body_writer: BodyWriter, frame_size: usize, mut total_size: usize) {
let data = "a".repeat(frame_size);
while total_size > 0 {
let size = std::cmp::min(frame_size, total_size);
body_writer
.write_all(&data.as_bytes()[..size])
.await
.expect("body write");
total_size -= size;
}
}

pub fn rt() -> Runtime {
Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap()
}

pub struct BenchBody {
frame_len: usize,
total_len: usize,
buf: Bytes,
}

impl BenchBody {
pub fn new(frame_len: usize, total_len: usize) -> Self {
Self {
total_len,
frame_len,
buf: "b".repeat(frame_len).into(),
}
}
}

impl HttpBody for BenchBody {
type Data = Bytes;
type Error = Error;
fn poll_data(
mut self: Pin<&mut Self>,
_: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
if self.total_len == 0 {
return Poll::Ready(None);
}

let size = cmp::min(self.total_len, self.frame_len);
self.total_len -= size;

Poll::Ready(Some(Ok(self.buf.slice(..size))))
}
fn poll_trailers(
self: Pin<&mut Self>,
_: &mut Context,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
53 changes: 27 additions & 26 deletions quinn-h3/benches/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing_futures::Instrument as _;
use quinn_h3::{self, client, server::IncomingConnection, Body, Settings};

mod helpers;
use helpers::Context;
use helpers::Bench;

benchmark_group!(
benches_request,
Expand Down Expand Up @@ -54,12 +54,15 @@ fn bench_google_body(bench: &mut Bencher) {

// Empty header values

pub fn empty_request() -> Request<()> {
Request::get("https://localhost").body(()).unwrap()
pub fn empty_request() -> Request<Body> {
Request::get("https://localhost").body(().into()).unwrap()
}

pub fn empty_response() -> Response<()> {
Response::builder().status(StatusCode::OK).body(()).unwrap()
pub fn empty_response() -> Response<Body> {
Response::builder()
.status(StatusCode::OK)
.body(().into())
.unwrap()
}

pub async fn empty_server(incoming: IncomingConnection, stop: oneshot::Receiver<()>) {
Expand All @@ -69,7 +72,7 @@ pub async fn empty_server(incoming: IncomingConnection, stop: oneshot::Receiver<
pub fn empty_response_body() -> Response<Body> {
Response::builder()
.status(StatusCode::OK)
.body("a".repeat(64 * 1024).as_str().into())
.body(Body::from("a".repeat(64 * 1024).as_str()))
.unwrap()
}

Expand All @@ -79,7 +82,7 @@ pub async fn empty_server_body(incoming: IncomingConnection, stop: oneshot::Rece

// Google header values

pub fn google_request() -> Request<()> {
pub fn google_request() -> Request<Body> {
Request::get("https://www.google.com/search?client=ubuntu&channel=fs&q=sfqfd&ie=utf-8&oe=utf-8")
.header("Host", "www.google.com")
.header(
Expand All @@ -97,7 +100,7 @@ pub fn google_request() -> Request<()> {
.header("Upgrade-Insecure-Requests", "1")
.header("Cache-Control", "max-age=0")
.header("TE", "Trailers")
.body(())
.body(().into())
.unwrap()
}

Expand All @@ -117,7 +120,7 @@ pub fn google_response() -> Response<Body> {
.header("set-cookie", "SIDCC=1111111111111111111111111111111111111111111111111111111111111111111111111111; expires=Mon, 19-Apr-2021 09:11:37 GMT; path=/; domain=.google.com; priority=high")
.header("alt-svc", "quic=\":443\"; ma=2592000; v=\"46,43\",h3-Q050=\":443\"; ma=2592000,h3-Q049=\":443\"; ma=2592000,h3-Q048=\":443\"; ma=2592000,h3-Q046=\":443\"; ma=2592000,h3-Q043=\":443\"; ma=2592000,h3-T050=\":443\"; ma=2592000")
.header("X-Firefox-Spdy", "h2")
.body(().into()).unwrap()
.body(Body::from(())).unwrap()
}

pub async fn google_server(incoming: IncomingConnection, stop: oneshot::Receiver<()>) {
Expand All @@ -140,7 +143,7 @@ pub fn google_response_body() -> Response<Body> {
.header("set-cookie", "SIDCC=1111111111111111111111111111111111111111111111111111111111111111111111111111; expires=Mon, 19-Apr-2021 09:11:37 GMT; path=/; domain=.google.com; priority=high")
.header("alt-svc", "quic=\":443\"; ma=2592000; v=\"46,43\",h3-Q050=\":443\"; ma=2592000,h3-Q049=\":443\"; ma=2592000,h3-Q048=\":443\"; ma=2592000,h3-Q046=\":443\"; ma=2592000,h3-Q043=\":443\"; ma=2592000,h3-T050=\":443\"; ma=2592000")
.header("X-Firefox-Spdy", "h2")
.body("a".repeat(64 * 1024).as_str().into()).unwrap()
.body(Body::from("a".repeat(64 * 1024).as_str())).unwrap()
}

pub async fn google_server_body(incoming: IncomingConnection, stop: oneshot::Receiver<()>) {
Expand All @@ -149,18 +152,17 @@ pub async fn google_server_body(incoming: IncomingConnection, stop: oneshot::Rec

// Runner

fn request<T, Fut>(
fn request<Fut>(
bench: &mut Bencher,
make_request: fn() -> Request<T>,
make_request: fn() -> Request<Body>,
service: fn(incoming_conn: IncomingConnection, stop_recv: oneshot::Receiver<()>) -> Fut,
settings: Settings,
) where
T: Into<Body>,
Fut: Future<Output = ()> + 'static,
{
let _ = tracing_subscriber::fmt::try_init();

let mut ctx = Context::with_settings(settings);
let mut ctx = Bench::with_settings(settings);

let (addr, server) = ctx.spawn_server(service);
let (client, mut runtime) = ctx.make_client(addr);
Expand All @@ -177,18 +179,18 @@ fn request<T, Fut>(
server.join().expect("server");
}

async fn request_client<T: Into<Body>>(client: &client::Connection, request: Request<T>) {
let (recv_resp, _) = client.send_request(request).await.expect("request");
let (_, mut body_reader) = recv_resp.await.expect("recv_resp");
while let Some(Ok(_)) = body_reader.data().await {}
async fn request_client(client: &client::Connection, request: Request<Body>) {
let (req, recv_resp) = client.send_request(request);
req.await.expect("request");
let mut resp = recv_resp.await.expect("recv_resp");
resp.body_mut().read_to_end().await.expect("read body");
}
async fn request_server<T>(

async fn request_server(
mut incoming_conn: IncomingConnection,
mut stop_recv: oneshot::Receiver<()>,
make_response: fn() -> Response<T>,
) where
T: Into<Body>,
{
make_response: fn() -> Response<Body>,
) {
let mut incoming_req = incoming_conn
.next()
.await
Expand All @@ -199,9 +201,8 @@ async fn request_server<T>(
select! {
_ = &mut stop_recv => break,
Some(recv_req) = incoming_req.next() => {
let (_, _, sender) = recv_req.await.expect("recv_req");
let _ = sender
.send_response(make_response())
let (_, mut sender) = recv_req.await.expect("recv_req");
sender.send_response(make_response())
.await
.expect("send_response");
},
Expand Down
Loading