Skip to content

Commit

Permalink
Support very large headers
Browse files Browse the repository at this point in the history
This completely refactors how headers are hpack-encoded.

Instead of trying to be clever, constructing frames on the go
while hpack-encoding, we just make a blob of all the
hpack-encoded headers first, and then we split that blob
in as many frames as necessary.
  • Loading branch information
nox committed Aug 24, 2021
1 parent 47d107a commit 7a87f24
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 390 deletions.
4 changes: 0 additions & 4 deletions src/codec/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ pub enum UserError {
/// The payload size is too big
PayloadTooBig,

/// A header size is too big
HeaderTooBig,

/// The application attempted to initiate too many streams to remote.
Rejected,

Expand Down Expand Up @@ -130,7 +127,6 @@ impl fmt::Display for UserError {
InactiveStreamId => "inactive stream",
UnexpectedFrameType => "unexpected frame type",
PayloadTooBig => "payload too big",
HeaderTooBig => "header too big",
Rejected => "rejected",
ReleaseCapacityTooBig => "release capacity too big",
OverflowedStreamId => "stream ID overflowed",
Expand Down
22 changes: 1 addition & 21 deletions src/codec/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,6 @@ where
match self.encoder.unset_frame() {
ControlFlow::Continue => (),
ControlFlow::Break => break,
ControlFlow::EndlessLoopHeaderTooBig => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
UserError::HeaderTooBig,
)));
}
}
}

Expand Down Expand Up @@ -199,7 +193,6 @@ where
enum ControlFlow {
Continue,
Break,
EndlessLoopHeaderTooBig,
}

impl<B> Encoder<B>
Expand All @@ -221,20 +214,7 @@ where
Some(Next::Continuation(frame)) => {
// Buffer the continuation frame, then try to write again
let mut buf = limited_write_buf!(self);
if let Some(continuation) = frame.encode(&mut self.hpack, &mut buf) {
// We previously had a CONTINUATION, and after encoding
// it, we got *another* one? Let's just double check
// that at least some progress is being made...
if self.buf.get_ref().len() == frame::HEADER_LEN {
// If *only* the CONTINUATION frame header was
// written, and *no* header fields, we're stuck
// in a loop...
tracing::warn!(
"CONTINUATION frame write loop; header value too big to encode"
);
return ControlFlow::EndlessLoopHeaderTooBig;
}

if let Some(continuation) = frame.encode(&mut buf) {
self.next = Some(Next::Continuation(continuation));
}
ControlFlow::Continue
Expand Down
188 changes: 100 additions & 88 deletions src/frame/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@ use crate::hpack::{self, BytesStr};
use http::header::{self, HeaderName, HeaderValue};
use http::{uri, HeaderMap, Method, Request, StatusCode, Uri};

use bytes::BytesMut;
use bytes::{BufMut, Bytes, BytesMut};

use std::fmt;
use std::io::Cursor;

type EncodeBuf<'a> = bytes::buf::Limit<&'a mut BytesMut>;

// Minimum MAX_FRAME_SIZE is 16kb, so save some arbitrary space for frame
// head and other header bits.
const MAX_HEADER_LENGTH: usize = 1024 * 16 - 100;

/// Header frame
///
/// This could be either a request or a response.
Expand Down Expand Up @@ -100,11 +95,7 @@ struct HeaderBlock {

#[derive(Debug)]
struct EncodingHeaderBlock {
/// Argument to pass to the HPACK encoder to resume encoding
hpack: Option<hpack::EncodeState>,

/// remaining headers to encode
headers: Iter,
hpack: Bytes,
}

const END_STREAM: u8 = 0x1;
Expand Down Expand Up @@ -241,10 +232,6 @@ impl Headers {
self.header_block.is_over_size
}

pub(crate) fn has_too_big_field(&self) -> bool {
self.header_block.has_too_big_field()
}

pub fn into_parts(self) -> (Pseudo, HeaderMap) {
(self.header_block.pseudo, self.header_block.fields)
}
Expand Down Expand Up @@ -279,8 +266,8 @@ impl Headers {
let head = self.head();

self.header_block
.into_encoding()
.encode(&head, encoder, dst, |_| {})
.into_encoding(encoder)
.encode(&head, dst, |_| {})
}

fn head(&self) -> Head {
Expand Down Expand Up @@ -480,17 +467,15 @@ impl PushPromise {
encoder: &mut hpack::Encoder,
dst: &mut EncodeBuf<'_>,
) -> Option<Continuation> {
use bytes::BufMut;

// At this point, the `is_end_headers` flag should always be set
debug_assert!(self.flags.is_end_headers());

let head = self.head();
let promised_id = self.promised_id;

self.header_block
.into_encoding()
.encode(&head, encoder, dst, |dst| {
.into_encoding(encoder)
.encode(&head, dst, |dst| {
dst.put_u32(promised_id.into());
})
}
Expand Down Expand Up @@ -529,15 +514,11 @@ impl Continuation {
Head::new(Kind::Continuation, END_HEADERS, self.stream_id)
}

pub fn encode(
self,
encoder: &mut hpack::Encoder,
dst: &mut EncodeBuf<'_>,
) -> Option<Continuation> {
pub fn encode(self, dst: &mut EncodeBuf<'_>) -> Option<Continuation> {
// Get the CONTINUATION frame head
let head = self.head();

self.header_block.encode(&head, encoder, dst, |_| {})
self.header_block.encode(&head, dst, |_| {})
}
}

Expand Down Expand Up @@ -617,13 +598,7 @@ impl Pseudo {
// ===== impl EncodingHeaderBlock =====

impl EncodingHeaderBlock {
fn encode<F>(
mut self,
head: &Head,
encoder: &mut hpack::Encoder,
dst: &mut EncodeBuf<'_>,
f: F,
) -> Option<Continuation>
fn encode<F>(mut self, head: &Head, dst: &mut EncodeBuf<'_>, f: F) -> Option<Continuation>
where
F: FnOnce(&mut EncodeBuf<'_>),
{
Expand All @@ -639,15 +614,17 @@ impl EncodingHeaderBlock {
f(dst);

// Now, encode the header payload
let continuation = match encoder.encode(self.hpack, &mut self.headers, dst) {
hpack::Encode::Full => None,
hpack::Encode::Partial(state) => Some(Continuation {
let continuation = if self.hpack.len() > dst.remaining_mut() {
dst.put_slice(&self.hpack.split_to(dst.remaining_mut()));

Some(Continuation {
stream_id: head.stream_id(),
header_block: EncodingHeaderBlock {
hpack: Some(state),
headers: self.headers,
},
}),
header_block: self,
})
} else {
dst.put_slice(&self.hpack);

None
};

// Compute the header block length
Expand Down Expand Up @@ -910,13 +887,17 @@ impl HeaderBlock {
Ok(())
}

fn into_encoding(self) -> EncodingHeaderBlock {
fn into_encoding(self, encoder: &mut hpack::Encoder) -> EncodingHeaderBlock {
let mut hpack = BytesMut::new();
let headers = Iter {
pseudo: Some(self.pseudo),
fields: self.fields.into_iter(),
};

encoder.encode(headers, &mut hpack);

EncodingHeaderBlock {
hpack: None,
headers: Iter {
pseudo: Some(self.pseudo),
fields: self.fields.into_iter(),
},
hpack: hpack.freeze(),
}
}

Expand Down Expand Up @@ -949,48 +930,79 @@ impl HeaderBlock {
.map(|(name, value)| decoded_header_size(name.as_str().len(), value.len()))
.sum::<usize>()
}

/// Iterate over all pseudos and headers to see if any individual pair
/// would be too large to encode.
pub(crate) fn has_too_big_field(&self) -> bool {
macro_rules! pseudo_size {
($name:ident) => {{
self.pseudo
.$name
.as_ref()
.map(|m| decoded_header_size(stringify!($name).len() + 1, m.as_str().len()))
.unwrap_or(0)
}};
}

if pseudo_size!(method) > MAX_HEADER_LENGTH {
return true;
}

if pseudo_size!(scheme) > MAX_HEADER_LENGTH {
return true;
}

if pseudo_size!(authority) > MAX_HEADER_LENGTH {
return true;
}

if pseudo_size!(path) > MAX_HEADER_LENGTH {
return true;
}

// skip :status, its never going to be too big

for (name, value) in &self.fields {
if decoded_header_size(name.as_str().len(), value.len()) > MAX_HEADER_LENGTH {
return true;
}
}

false
}
}

fn decoded_header_size(name: usize, value: usize) -> usize {
name + value + 32
}

#[cfg(test)]
mod test {
use std::iter::FromIterator;

use http::HeaderValue;

use super::*;
use crate::frame;
use crate::hpack::{huffman, Encoder};

#[test]
fn test_nameless_header_at_resume() {
let mut encoder = Encoder::default();
let mut dst = BytesMut::new();

let headers = Headers::new(
StreamId::ZERO,
Default::default(),
HeaderMap::from_iter(vec![
(
HeaderName::from_static("hello"),
HeaderValue::from_static("world"),
),
(
HeaderName::from_static("hello"),
HeaderValue::from_static("zomg"),
),
(
HeaderName::from_static("hello"),
HeaderValue::from_static("sup"),
),
]),
);

let continuation = headers
.encode(&mut encoder, &mut (&mut dst).limit(frame::HEADER_LEN + 8))
.unwrap();

assert_eq!(17, dst.len());
assert_eq!([0, 0, 8, 1, 0, 0, 0, 0, 0], &dst[0..9]);
assert_eq!(&[0x40, 0x80 | 4], &dst[9..11]);
assert_eq!("hello", huff_decode(&dst[11..15]));
assert_eq!(0x80 | 4, dst[15]);

let mut world = dst[16..17].to_owned();

dst.clear();

assert!(continuation
.encode(&mut (&mut dst).limit(frame::HEADER_LEN + 16))
.is_none());

world.extend_from_slice(&dst[9..12]);
assert_eq!("world", huff_decode(&world));

assert_eq!(24, dst.len());
assert_eq!([0, 0, 15, 9, 4, 0, 0, 0, 0], &dst[0..9]);

// // Next is not indexed
assert_eq!(&[15, 47, 0x80 | 3], &dst[12..15]);
assert_eq!("zomg", huff_decode(&dst[15..18]));
assert_eq!(&[15, 47, 0x80 | 3], &dst[18..21]);
assert_eq!("sup", huff_decode(&dst[21..]));
}

fn huff_decode(src: &[u8]) -> BytesMut {
let mut buf = BytesMut::new();
huffman::decode(src, &mut buf).unwrap()
}
}
2 changes: 1 addition & 1 deletion src/hpack/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ mod test {

fn huff_encode(src: &[u8]) -> BytesMut {
let mut buf = BytesMut::new();
huffman::encode(src, &mut buf).unwrap();
huffman::encode(src, &mut buf);
buf
}
}
Loading

0 comments on commit 7a87f24

Please sign in to comment.