Skip to content

Commit

Permalink
do not use BufList
Browse files Browse the repository at this point in the history
  • Loading branch information
alce committed Jan 12, 2020
1 parent 83bbde0 commit 4728f24
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 86 deletions.
85 changes: 11 additions & 74 deletions tonic/src/codec/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,89 +1,35 @@
use bytes::{Buf, Bytes};
use std::collections::VecDeque;
use bytes::{Buf, Bytes, BytesMut};
use std::fmt;
use tracing::warn;

pub(crate) struct BufList {
bufs: VecDeque<Bytes>,
}

/// A buffer to decode messages from.
pub struct DecodeBuf<'a> {
buf: &'a mut BufList,
len: usize,
buf: &'a mut BytesMut,
}

impl<'a> DecodeBuf<'a> {
pub(crate) fn new(buf: &'a mut BufList, len: usize) -> Self {
DecodeBuf { buf, len }
}
}

impl BufList {
pub(crate) fn new() -> Self {
BufList {
bufs: VecDeque::new(),
}
}

pub(crate) fn push(&mut self, bytes: Bytes) {
debug_assert!(bytes.has_remaining());
self.bufs.push_back(bytes)
}
}

impl Buf for BufList {
#[inline]
fn remaining(&self) -> usize {
self.bufs.iter().fold(0, |a, b| a + b.remaining())
}

#[inline]
fn bytes(&self) -> &[u8] {
self.bufs.front().map(Buf::bytes).unwrap_or_default()
}

#[inline]
fn advance(&mut self, mut cnt: usize) {
while cnt > 0 {
{
let front = &mut self.bufs[0];
let rem = front.remaining();
if rem > cnt {
front.advance(cnt);
return;
} else {
front.advance(rem);
cnt -= rem;
}
}
self.bufs.pop_front();
}
pub(crate) fn new(buf: &'a mut BytesMut) -> Self {
DecodeBuf { buf }
}
}

impl<'a> Buf for DecodeBuf<'a> {
#[inline]
fn remaining(&self) -> usize {
self.len
self.buf.len()
}

#[inline]
fn bytes(&self) -> &[u8] {
let ret = self.buf.bytes();

if ret.len() > self.len {
&ret[..self.len]
} else {
ret
}
self.buf.bytes()
}

#[inline]
fn advance(&mut self, cnt: usize) {
assert!(cnt <= self.len);
self.buf.advance(cnt);
self.len -= cnt;
self.buf.advance(cnt)
}

fn to_bytes(&mut self) -> Bytes {
self.buf.to_bytes()
}
}

Expand All @@ -92,12 +38,3 @@ impl<'a> fmt::Debug for DecodeBuf<'a> {
f.debug_struct("DecodeBuf").finish()
}
}

impl<'a> Drop for DecodeBuf<'a> {
fn drop(&mut self) {
if self.len > 0 {
warn!("DecodeBuf was not advanced to end");
self.buf.advance(self.len);
}
}
}
29 changes: 18 additions & 11 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Decoder;
use crate::codec::buffer::{BufList, DecodeBuf};
use crate::codec::buffer::DecodeBuf;
use crate::{body::BoxBody, metadata::MetadataMap, Code, Status};
use bytes::Buf;
use bytes::{Buf, BufMut, BytesMut};
use futures_core::Stream;
use futures_util::{future, ready};
use http::StatusCode;
Expand All @@ -13,6 +13,8 @@ use std::{
};
use tracing::{debug, trace};

const BUFFER_SIZE: usize = 8 * 1024;

/// Streaming requests and responses.
///
/// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface
Expand All @@ -22,7 +24,7 @@ pub struct Streaming<T> {
body: BoxBody,
state: State,
direction: Direction,
buf: BufList,
buf: BytesMut,
trailers: Option<MetadataMap>,
}

Expand Down Expand Up @@ -81,7 +83,7 @@ impl<T> Streaming<T> {
body: BoxBody::map_from(body),
state: State::ReadHeader,
direction,
buf: BufList::new(),
buf: BytesMut::with_capacity(BUFFER_SIZE),
trailers: None,
}
}
Expand Down Expand Up @@ -184,14 +186,11 @@ impl<T> Streaming<T> {
if let State::ReadBody { len, .. } = &self.state {
// if we haven't read enough of the message then return and keep
// reading
if self.buf.remaining() < *len {
if self.buf.remaining() < *len || self.buf.len() < *len {
return Ok(None);
}

return match self
.decoder
.decode(&mut DecodeBuf::new(&mut self.buf, *len))
{
return match self.decoder.decode(&mut DecodeBuf::new(&mut self.buf)) {
Ok(Some(msg)) => {
self.state = State::ReadHeader;
Ok(Some(msg))
Expand Down Expand Up @@ -231,9 +230,17 @@ impl<T> Stream for Streaming<T> {
};

if let Some(data) = chunk {
if data.has_remaining() {
self.buf.push(data);
if data.remaining() > self.buf.remaining_mut() {
let amt = if data.remaining() > BUFFER_SIZE {
data.remaining()
} else {
BUFFER_SIZE
};

self.buf.reserve(amt);
}

self.buf.put(data);
} else {
// FIXME: improve buf usage.
if self.buf.has_remaining() {
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/codec/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl<U: Message + Default> Decoder for ProstDecoder<U> {
type Item = U;
type Error = Status;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
let item = Message::decode(buf)
.map(Option::Some)
.map_err(from_decode_error)?;
Expand Down

0 comments on commit 4728f24

Please sign in to comment.