Skip to content

Commit

Permalink
chore: Streaming<T> reduce monomorphization cost
Browse files Browse the repository at this point in the history
This was seen using `cargo llvm-lines`.

The following functions where high in generated lines with a few copies
(~ one copy by send message type).

- <tonic::codec::decode::Streaming<T> as futures_core::stream::Stream>::poll_next
- tonic::codec::decode::Streaming<T>::decode_chunk

Both of them contains a large part of code that is actually not impacted
by the type parameter and so can be reworked a bit to improve code
generation (should help produce small binaries and reduce build time).
  • Loading branch information
Guilhem Vallat committed Nov 26, 2021
1 parent 754403c commit 8d4917a
Showing 1 changed file with 141 additions and 91 deletions.
232 changes: 141 additions & 91 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ const BUFFER_SIZE: usize = 8 * 1024;
/// to fetch the message stream and trailing metadata
pub struct Streaming<T> {
decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>,
inner: StreamingInner,
}

// This inner object is not generic and every helper method implemented on it helps reducing monomorphization cost
struct StreamingInner {
body: BoxBody,
state: State,
direction: Direction,
Expand Down Expand Up @@ -117,18 +122,20 @@ impl<T> Streaming<T> {
{
Self {
decoder: Box::new(decoder),
body: body
.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
.map_err(|err| Status::map_error(err.into()))
.boxed_unsync(),
state: State::ReadHeader,
direction,
buf: BytesMut::with_capacity(BUFFER_SIZE),
trailers: None,
#[cfg(feature = "compression")]
decompress_buf: BytesMut::new(),
#[cfg(feature = "compression")]
encoding,
inner: StreamingInner {
body: body
.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
.map_err(|err| Status::map_error(err.into()))
.boxed_unsync(),
state: State::ReadHeader,
direction,
buf: BytesMut::with_capacity(BUFFER_SIZE),
trailers: None,
#[cfg(feature = "compression")]
decompress_buf: BytesMut::new(),
#[cfg(feature = "compression")]
encoding,
},
}
}
}
Expand Down Expand Up @@ -174,13 +181,62 @@ impl<T> Streaming<T> {
pub async fn trailers(&mut self) -> Result<Option<MetadataMap>, Status> {
// Shortcut to see if we already pulled the trailers in the stream step
// we need to do that so that the stream can error on trailing grpc-status
if let Some(trailers) = self.trailers.take() {
if let Some(trailers) = self.inner.trailers.take() {
return Ok(Some(trailers));
}

// To fetch the trailers we must clear the body and drop it.
while self.message().await?.is_some() {}

self.inner.trailers().await
}

fn decode_chunk(&mut self) -> Result<Option<T>, Status> {
self.inner.decode_header()?;

if let State::ReadBody { len, compression } = &self.inner.state {
// if we haven't read enough of the message then return and keep
// reading
if self.inner.buf.remaining() < *len || self.inner.buf.len() < *len {
return Ok(None);
}

let decoding_result = if *compression {
#[cfg(feature = "compression")]
{
let len = *len;
self.inner.decompress_buf(len)?;
let decompressed_len = self.inner.decompress_buf.len();
self.decoder.decode(&mut DecodeBuf::new(
&mut self.inner.decompress_buf,
decompressed_len,
))
}

#[cfg(not(feature = "compression"))]
unreachable!("should not take this branch if compression is disabled")
} else {
self.decoder
.decode(&mut DecodeBuf::new(&mut self.inner.buf, *len))
};

return match decoding_result {
Ok(Some(msg)) => {
self.inner.state = State::ReadHeader;
Ok(Some(msg))
}
Ok(None) => Ok(None),
Err(e) => Err(e),
};
}

Ok(None)
}
}

impl StreamingInner {
// note: do not inline this in Streaming<T>::trailers() as it helps reducing cost of monomorphization
async fn trailers(&mut self) -> Result<Option<MetadataMap>, Status> {
// Since we call poll_trailers internally on poll_next we need to
// check if it got cached again.
if let Some(trailers) = self.trailers.take() {
Expand All @@ -196,10 +252,11 @@ impl<T> Streaming<T> {
map.map(|x| x.map(MetadataMap::from_headers))
}

fn decode_chunk(&mut self) -> Result<Option<T>, Status> {
// note: do not inline this in decode_buf() as it helps reducing cost of monomorphization
fn decode_header(&mut self) -> Result<(), Status> {
if let State::ReadHeader = self.state {
if self.buf.remaining() < HEADER_SIZE {
return Ok(None);
return Ok(());
}

let is_compressed = match self.buf.get_u8() {
Expand Down Expand Up @@ -246,63 +303,35 @@ impl<T> Streaming<T> {
len,
}
}
Ok(())
}

if let State::ReadBody { len, compression } = &self.state {
// if we haven't read enough of the message then return and keep
// reading
if self.buf.remaining() < *len || self.buf.len() < *len {
return Ok(None);
}

let decoding_result = if *compression {
#[cfg(feature = "compression")]
{
self.decompress_buf.clear();

if let Err(err) = decompress(
self.encoding.unwrap_or_else(|| {
// SAFETY: The check while in State::ReadHeader would already have returned Code::Internal
unreachable!("message was compressed but `Streaming.encoding` was `None`. This is a bug in Tonic. Please file an issue")
}),
&mut self.buf,
&mut self.decompress_buf,
*len,
) {
let message = if let Direction::Response(status) = self.direction {
format!(
"Error decompressing: {}, while receiving response with status: {}",
err, status
)
} else {
format!("Error decompressing: {}, while sending request", err)
};
return Err(Status::new(Code::Internal, message));
}
let decompressed_len = self.decompress_buf.len();
self.decoder.decode(&mut DecodeBuf::new(
&mut self.decompress_buf,
decompressed_len,
))
}

#[cfg(not(feature = "compression"))]
unreachable!("should not take this branch if compression is disabled")
#[cfg(feature = "compression")]
// note: do not inline this in decode_buf() as it helps reducing cost of monomorphization
fn decompress_buf(&mut self, len: usize) -> Result<(), Status> {
self.decompress_buf.clear();

if let Err(err) = decompress(
self.encoding.unwrap_or_else(|| {
// SAFETY: The check while in State::ReadHeader would already have returned Code::Internal
unreachable!("message was compressed but `Streaming.encoding` was `None`. This is a bug in Tonic. Please file an issue")
}),
&mut self.buf,
&mut self.decompress_buf,
len,
) {
let message = if let Direction::Response(status) = self.direction {
format!(
"Error decompressing: {}, while receiving response with status: {}",
err, status
)
} else {
self.decoder
.decode(&mut DecodeBuf::new(&mut self.buf, *len))
};

return match decoding_result {
Ok(Some(msg)) => {
self.state = State::ReadHeader;
Ok(Some(msg))
}
Ok(None) => Ok(None),
Err(e) => Err(e),
format!("Error decompressing: {}, while sending request", err)
};
return Err(Status::new(Code::Internal, message));
}

Ok(None)
Ok(())
}
}

Expand All @@ -318,39 +347,60 @@ impl<T> Stream for Streaming<T> {
return Poll::Ready(Some(Ok(item)));
}

let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(e)) => {
let err: crate::Error = e.into();
debug!("decoder inner stream error: {:?}", err);
let status = Status::from_error(err);
return Poll::Ready(Some(Err(status)));
}
None => None,
};
match ready!(self.inner.poll_data(cx)) {
Some(Ok(_)) => break,
Some(Err(err)) => return Poll::Ready(Some(Err(err))),
_ => {}
}
}

match ready!(self.inner.poll_trailers(cx)) {
Some(err) => Poll::Ready(Some(Err(err))),
_ => Poll::Ready(None),
}
}
}

if let Some(data) = chunk {
self.buf.put(data);
impl StreamingInner {
// note: do not inline this in poll_next() as it helps reducing cost of monomorphization
fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<(), Status>>> {
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(e)) => {
let err: crate::Error = e.into();
debug!("decoder inner stream error: {:?}", err);
let status = Status::from_error(err);
return Poll::Ready(Some(Err(status)));
}
None => None,
};

if let Some(data) = chunk {
self.buf.put(data);
} else {
// FIXME: improve buf usage.
if self.buf.has_remaining() {
trace!("unexpected EOF decoding stream");
return Poll::Ready(Some(Err(Status::new(
Code::Internal,
"Unexpected EOF decoding stream.".to_string(),
))));
} else {
// FIXME: improve buf usage.
if self.buf.has_remaining() {
trace!("unexpected EOF decoding stream");
return Poll::Ready(Some(Err(Status::new(
Code::Internal,
"Unexpected EOF decoding stream.".to_string(),
))));
} else {
break;
}
return Poll::Ready(Some(Ok(())));
}
}

Poll::Ready(None)
}

// note: do not inline this in poll_next() as it helps reducing cost of monomorphization
fn poll_trailers(&mut self, cx: &mut Context<'_>) -> Poll<Option<Status>> {
if let Direction::Response(status) = self.direction {
match ready!(Pin::new(&mut self.body).poll_trailers(cx)) {
Ok(trailer) => {
if let Err(e) = crate::status::infer_grpc_status(trailer.as_ref(), status) {
if let Some(e) = e {
return Some(Err(e)).into();
return Poll::Ready(Some(e));
} else {
return Poll::Ready(None);
}
Expand All @@ -362,7 +412,7 @@ impl<T> Stream for Streaming<T> {
let err: crate::Error = e.into();
debug!("decoder inner trailers error: {:?}", err);
let status = Status::from_error(err);
return Some(Err(status)).into();
return Poll::Ready(Some(status));
}
}
}
Expand Down

0 comments on commit 8d4917a

Please sign in to comment.