Skip to content

Commit

Permalink
H3: refactor header receiving code into RecvData
Browse files Browse the repository at this point in the history
  • Loading branch information
stammw committed Apr 26, 2020
1 parent 057e988 commit 71e78c0
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 182 deletions.
113 changes: 33 additions & 80 deletions quinn-h3/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@
use std::{
any::Any,
future::Future,
mem,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};

use futures::{channel::oneshot, ready, FutureExt, Stream};
use futures::{channel::oneshot, ready, FutureExt};
use http::{request, Request, Response};
use http_body::Body;
use quinn::{Certificate, Endpoint, OpenBi, RecvStream};
Expand All @@ -86,10 +85,9 @@ use tracing::trace;
use crate::{
body::RecvBody,
connection::{ConnectionDriver, ConnectionRef},
frame::{FrameDecoder, FrameStream},
headers::DecodeHeaders,
proto::{frame::HttpFrame, headers::Header, settings::Settings, ErrorCode},
streams::Reset,
data::RecvData,
frame::FrameDecoder,
proto::{headers::Header, settings::Settings, ErrorCode},
Error, SendData, ZeroRttAccepted,
};

Expand Down Expand Up @@ -658,13 +656,12 @@ pub struct RecvResponse {
state: RecvResponseState,
conn: ConnectionRef,
stream_id: Option<StreamId>,
recv: Option<FrameStream>,
recv: Option<RecvData>,
}

enum RecvResponseState {
Opening(oneshot::Receiver<(RecvStream, StreamId)>),
Receiving(FrameStream),
Decoding(DecodeHeaders),
Receiving,
Finished,
}

Expand All @@ -685,37 +682,15 @@ impl RecvResponse {
///
/// Server will receive a request error with `REQUEST_CANCELLED` code. Any call on any
/// object related with this request will fail.
pub fn cancel(mut self) {
let mut recv = match mem::replace(&mut self.state, RecvResponseState::Finished) {
RecvResponseState::Receiving(recv) => recv,
RecvResponseState::Decoding(_) => self.recv.take().expect("cancel recv"),
_ => return,
};
self.conn
.h3
.lock()
.unwrap()
.cancel_request(self.stream_id.unwrap());
recv.reset(ErrorCode::REQUEST_CANCELLED);
}

fn build_response(
&self,
header: Header,
recv: FrameStream,
) -> Result<Response<RecvBody>, Error> {
let (status, headers) = header.into_response_parts()?;
let mut response = Response::builder()
.status(status)
.version(http::version::Version::HTTP_3)
.body(RecvBody::new(
self.conn.clone(),
self.stream_id.unwrap(),
recv,
))
.unwrap();
*response.headers_mut() = headers;
Ok(response)
pub fn cancel(&mut self) {
if let Some(recv) = self.recv.as_mut() {
self.conn
.h3
.lock()
.unwrap()
.cancel_request(self.stream_id.unwrap());
recv.reset(ErrorCode::REQUEST_CANCELLED);
}
}
}

Expand All @@ -734,47 +709,25 @@ impl Future for RecvResponse {
let (recv, id) = ready!(open.poll_unpin(cx))
.map_err(|_| Error::internal("RecvResponse channel cancelled"))?;
self.stream_id = Some(id);
self.state = RecvResponseState::Receiving(FrameDecoder::stream(recv));
}
RecvResponseState::Receiving(ref mut recv) => {
let frame = ready!(Pin::new(recv).poll_next(cx));

trace!("client got {:?}", frame);
match frame {
None => return Poll::Ready(Err(Error::peer("received an empty response"))),
Some(Err(e)) => return Poll::Ready(Err(e.into())),
Some(Ok(f)) => match f {
HttpFrame::Reserved => (),
HttpFrame::Headers(h) => {
let decode = DecodeHeaders::new(
h,
self.conn.clone(),
self.stream_id.unwrap(),
);
match mem::replace(
&mut self.state,
RecvResponseState::Decoding(decode),
) {
RecvResponseState::Receiving(r) => self.recv = Some(r),
_ => unreachable!(),
};
}
_ => {
match mem::replace(&mut self.state, RecvResponseState::Finished) {
RecvResponseState::Receiving(mut recv) => {
recv.reset(ErrorCode::FRAME_UNEXPECTED);
}
_ => unreachable!(),
}
return Poll::Ready(Err(Error::peer("first frame is not headers")));
}
},
}
self.recv = Some(RecvData::new(
FrameDecoder::stream(recv),
self.conn.clone(),
self.stream_id.unwrap(),
));
self.state = RecvResponseState::Receiving;
}
RecvResponseState::Decoding(ref mut decode) => {
let headers = ready!(Pin::new(decode).poll(cx))?;
let recv = self.recv.take().unwrap();
let response = self.build_response(headers, recv)?;
RecvResponseState::Receiving => {
let (headers, body) = ready!(self.recv.as_mut().unwrap().poll_unpin(cx))?;

let (status, headers) = headers.into_response_parts()?;
let mut response = Response::builder()
.status(status)
.version(http::version::Version::HTTP_3)
.body(body)
.unwrap();
*response.headers_mut() = headers;

self.state = RecvResponseState::Finished;
return Poll::Ready(Ok(response));
}
}
Expand Down
100 changes: 96 additions & 4 deletions quinn-h3/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ use std::{
};

use bytes::Buf;
use futures::ready;
use futures::{ready, Stream as _};
use http_body::Body;
use quinn::SendStream;
use quinn_proto::StreamId;

use crate::{
body::RecvBody,
connection::ConnectionRef,
frame::WriteFrame,
headers::SendHeaders,
frame::{FrameStream, WriteFrame},
headers::{DecodeHeaders, SendHeaders},
proto::{
frame::DataFrame,
frame::{DataFrame, HttpFrame},
headers::Header,
ErrorCode,
},
streams::Reset,
Error,
};

Expand Down Expand Up @@ -143,3 +145,93 @@ where
)?)
}
}

impl<B> Future for SendData<B, B::Data>
where
B: Body + Unpin,
B::Data: Buf + Unpin,
B::Error: std::fmt::Debug + Any + Send + Sync,
{
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
while !ready!(self.poll_inner(cx))? {}
Poll::Ready(Ok(()))
}
}

pub struct RecvData {
state: RecvDataState,
conn: ConnectionRef,
recv: Option<FrameStream>,
stream_id: StreamId,
}

enum RecvDataState {
Receiving,
Decoding(DecodeHeaders),
Finished,
}

impl RecvData {
pub(crate) fn new(recv: FrameStream, conn: ConnectionRef, stream_id: StreamId) -> Self {
Self {
conn,
stream_id,
recv: Some(recv),
state: RecvDataState::Receiving,
}
}

pub fn reset(&mut self, err_code: ErrorCode) {
if let Some(ref mut r) = self.recv {
r.reset(err_code.into());
}
}
}

impl Future for RecvData {
type Output = Result<(Header, RecvBody), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match &mut self.state {
RecvDataState::Receiving => {
match ready!(Pin::new(self.recv.as_mut().unwrap()).poll_next(cx)) {
Some(Ok(HttpFrame::Reserved)) => continue,
Some(Ok(HttpFrame::Headers(h))) => {
self.state = RecvDataState::Decoding(DecodeHeaders::new(
h,
self.conn.clone(),
self.stream_id,
));
}
Some(Err(e)) => {
self.recv.as_mut().unwrap().reset(e.code());
return Poll::Ready(Err(e.into()));
}
Some(Ok(f)) => {
self.recv
.as_mut()
.unwrap()
.reset(ErrorCode::FRAME_UNEXPECTED);
return Poll::Ready(Err(Error::Peer(format!(
"First frame is not headers: {:?}",
f
))));
}
None => {
return Poll::Ready(Err(Error::peer("Stream end unexpected")));
}
};
}
RecvDataState::Decoding(ref mut decode) => {
let headers = ready!(Pin::new(decode).poll(cx))?;
let recv =
RecvBody::new(self.conn.clone(), self.stream_id, self.recv.take().unwrap());
self.state = RecvDataState::Finished;
return Poll::Ready(Ok((headers, recv)));
}
RecvDataState::Finished => panic!("polled after finished"),
}
}
}
}
Loading

0 comments on commit 71e78c0

Please sign in to comment.