Skip to content

Commit

Permalink
Wire up Trailers frames
Browse files Browse the repository at this point in the history
  • Loading branch information
olix0r committed Aug 1, 2017
1 parent 27afa19 commit df27e02
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 23 deletions.
4 changes: 4 additions & 0 deletions src/frame/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ impl Headers {
self.flags.set_end_stream()
}

pub fn into_fields(self) -> HeaderMap<HeaderValue> {
self.fields
}

pub fn into_response(self) -> response::Head {
let mut response = response::Head::default();

Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use bytes::Bytes;

pub type FrameSize = u32;

pub type HeaderMap = http::HeaderMap<http::header::HeaderValue>;

/// An H2 connection frame
#[derive(Debug)]
pub enum Frame<T, B = Bytes> {
Expand All @@ -57,7 +59,7 @@ pub enum Frame<T, B = Bytes> {
},
Trailers {
id: StreamId,
headers: (),
headers: HeaderMap,
},
PushPromise {
id: StreamId,
Expand Down
31 changes: 25 additions & 6 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use {ConnectionError, Frame, Peer};
use {ConnectionError, HeaderMap, Frame, Peer};
use frame::{self, StreamId};
use client::Client;
use server::Server;
Expand Down Expand Up @@ -110,6 +110,17 @@ impl<T, P, B> Connection<T, P, B>
})
}

pub fn send_trailers(self,
id: StreamId,
headers: HeaderMap)
-> sink::Send<Self>
{
self.send(Frame::Trailers {
id,
headers,
})
}

// ===== Private =====

/// Returns `Ready` when the `Connection` is ready to receive a frame from
Expand Down Expand Up @@ -165,7 +176,7 @@ impl<T, P, B> Connection<T, P, B>
Some(Headers(frame)) => {
trace!("recv HEADERS; frame={:?}", frame);
// Update stream state while ensuring that the headers frame
// can be received
// can be received.
if let Some(frame) = try!(self.streams.recv_headers(frame)) {
let frame = Self::convert_poll_message(frame);
return Ok(Some(frame).into());
Expand Down Expand Up @@ -222,8 +233,10 @@ impl<T, P, B> Connection<T, P, B>

fn convert_poll_message(frame: frame::Headers) -> Frame<P::Poll> {
if frame.is_trailers() {
// TODO: return trailers
unimplemented!();
Frame::Trailers {
id: frame.stream_id(),
headers: frame.into_fields()
}
} else {
Frame::Headers {
id: frame.stream_id(),
Expand Down Expand Up @@ -338,10 +351,16 @@ impl<T, P, B> Sink for Connection<T, P, B>

Frame::Reset { id, error } => frame::Reset::new(id, error).into(),

/*
Frame::Trailers { id, headers } => {
unimplemented!();
let mut f = frame::Headers::new(id, frame::Pseudo::default(), headers);
f.set_end_stream();

self.streams.send_headers(&f)?;

frame::Frame::Headers(f)
}

/*
Frame::PushPromise { id, promise } => {
unimplemented!();
}
Expand Down
36 changes: 20 additions & 16 deletions src/proto/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Streams {
};

if frame.is_trailers() {
try!(self.inner.recv_trailers(id, state, frame.is_end_stream()));
try!(self.inner.recv_eos(id, state));
} else {
try!(self.inner.recv_headers(id, state, frame.is_end_stream()));
}
Expand Down Expand Up @@ -222,7 +222,7 @@ impl Streams {
};

if frame.is_trailers() {
try!(self.inner.send_trailers(id, state, frame.is_end_stream()));
try!(self.inner.send_eos(id, state));
} else {
try!(self.inner.send_headers(id, state, frame.is_end_stream()));
}
Expand Down Expand Up @@ -326,10 +326,16 @@ impl Inner {
Ok(())
}

fn recv_trailers(&mut self, _id: StreamId, _state: &mut state::Stream, _eos: bool)
fn recv_eos(&mut self, id: StreamId, state: &mut state::Stream)
-> Result<(), ConnectionError>
{
unimplemented!();
try!(state.recv_close());

if state.is_closed() {
self.stream_closed(id)
}

Ok(())
}

fn recv_data(&mut self,
Expand All @@ -356,11 +362,7 @@ impl Inner {
}

if eos {
try!(state.recv_close());

if state.is_closed() {
self.stream_closed(id)
}
self.recv_eos(id, state)?
}

Ok(())
Expand Down Expand Up @@ -396,10 +398,16 @@ impl Inner {
Ok(())
}

fn send_trailers(&mut self, _id: StreamId, _state: &mut state::Stream, _eos: bool)
fn send_eos(&mut self, id: StreamId, state: &mut state::Stream)
-> Result<(), ConnectionError>
{
unimplemented!();
try!(state.send_close());

if state.is_closed() {
self.stream_closed(id);
}

Ok(())
}

fn send_data(&mut self,
Expand Down Expand Up @@ -435,11 +443,7 @@ impl Inner {
}

if eos {
try!(state.send_close());

if state.is_closed() {
self.stream_closed(id)
}
self.send_eos(id, state)?;
}

Ok(())
Expand Down

0 comments on commit df27e02

Please sign in to comment.