diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 742660e31..829b40e34 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -38,12 +38,43 @@ impl Unpin for Streaming {} #[derive(Debug, Clone)] enum State { - ReadHeader, + ReadHeader { + span: Option, + }, ReadBody { + span: tracing::Span, compression: Option, len: usize, }, - Error(Status), + Error(Box), +} + +impl State { + fn read_header() -> Self { + Self::ReadHeader { span: None } + } + + fn read_body(compression: Option, len: usize) -> Self { + let span = tracing::debug_span!( + "read_body", + body.compression = compression.map(|c| c.as_str()).unwrap_or("none"), + body.bytes.compressed = compression.is_some().then_some(len), + body.bytes.uncompressed = compression.is_none().then_some(len), + ); + Self::ReadBody { + span, + compression, + len, + } + } + + fn span(&self) -> Option<&tracing::Span> { + match self { + Self::ReadHeader { span } => span.as_ref(), + Self::ReadBody { span, .. } => Some(span), + Self::Error(_) => None, + } + } } #[derive(Debug, PartialEq, Eq)] @@ -125,7 +156,7 @@ impl Streaming { .map_frame(|frame| frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))) .map_err(|err| Status::map_error(err.into())) .boxed_unsync(), - state: State::ReadHeader, + state: State::read_header(), direction, buf: BytesMut::with_capacity(buffer_size), trailers: None, @@ -142,7 +173,19 @@ impl StreamingInner { &mut self, buffer_settings: BufferSettings, ) -> Result>, Status> { - if let State::ReadHeader = self.state { + if let State::ReadHeader { span } = &mut self.state { + if !self.buf.has_remaining() { + return Ok(None); + } + + let span = span.get_or_insert_with(|| { + tracing::debug_span!( + "read_header", + body.compression = "none", + body.bytes = tracing::field::Empty, + ) + }); + let _guard = span.enter(); if self.buf.remaining() < HEADER_SIZE { return Ok(None); } @@ -151,7 +194,8 @@ impl StreamingInner { 0 => None, 1 => { { - if self.encoding.is_some() { + if let Some(ce) = self.encoding { + span.record("body.compression", ce.as_str()); self.encoding } else { // https://grpc.github.io/grpc/core/md_doc_compression.html @@ -177,6 +221,7 @@ impl StreamingInner { }; let len = self.buf.get_u32() as usize; + span.record("body.bytes", len); let limit = self .max_message_size .unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE); @@ -191,14 +236,19 @@ impl StreamingInner { } self.buf.reserve(len); + drop(_guard); - self.state = State::ReadBody { - compression: compression_encoding, - len, - } + self.state = State::read_body(compression_encoding, len) } - if let State::ReadBody { len, compression } = self.state { + if let State::ReadBody { + len, + span, + compression, + } = &self.state + { + let (len, compression) = (*len, *compression); + let _guard = span.enter(); // if we haven't read enough of the message then return and keep // reading if self.buf.remaining() < len || self.buf.len() < len { @@ -228,6 +278,7 @@ impl StreamingInner { return Err(Status::new(Code::Internal, message)); } let decompressed_len = self.decompress_buf.len(); + span.record("body.bytes.uncompressed", decompressed_len); DecodeBuf::new(&mut self.decompress_buf, decompressed_len) } else { DecodeBuf::new(&mut self.buf, len) @@ -241,6 +292,7 @@ impl StreamingInner { // Returns Some(()) if data was found or None if the loop in `poll_next` should break fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll, Status>> { + let _guard = self.state.span().map(|s| s.enter()); let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) { Some(Ok(d)) => Some(d), Some(Err(status)) => { @@ -248,7 +300,8 @@ impl StreamingInner { return Poll::Ready(Ok(None)); } - let _ = std::mem::replace(&mut self.state, State::Error(status.clone())); + drop(_guard); + let _ = std::mem::replace(&mut self.state, State::Error(Box::new(status.clone()))); debug!("decoder inner stream error: {:?}", status); return Poll::Ready(Err(status)); } @@ -378,7 +431,7 @@ impl Streaming { match self.inner.decode_chunk(self.decoder.buffer_settings())? { Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? { Some(msg) => { - self.inner.state = State::ReadHeader; + self.inner.state = State::read_header(); Ok(Some(msg)) } None => Ok(None), @@ -394,7 +447,7 @@ impl Stream for Streaming { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if let State::Error(status) = &self.inner.state { - return Poll::Ready(Some(Err(status.clone()))); + return Poll::Ready(Some(Err(*status.clone()))); } if let Some(item) = self.decode_chunk()? {