From d84e6c0b0d1427edb366704b9a3bfd7e48f9d3d4 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Sun, 22 Sep 2024 23:24:12 +0900 Subject: [PATCH] (de)compression: reduce memory allocation to improve performance Currently, every time `WrapBody::poll_frame` is called, new instance of `BytesMut` is created with the default capacity, which is effectively 64 bytes. This ends up with a lot of memory allocation in certain situations, making the throughput significantly worse. To optimize memory allocation, `WrapBody` now gets `BytesMut` as its field, with initial capacity of 4096 bytes. This buffer will be reused as much as possible across multiple `poll_frame` calls, and only when its capacity becomes 0, new allocation of another 4096 bytes is performed. Fixes: #520 --- tower-http/src/compression_utils.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/tower-http/src/compression_utils.rs b/tower-http/src/compression_utils.rs index 153ae324..58aa6a08 100644 --- a/tower-http/src/compression_utils.rs +++ b/tower-http/src/compression_utils.rs @@ -141,10 +141,17 @@ pin_project! { // rust-analyer thinks this field is private if its `pub(crate)` but works fine when its // `pub` pub read: M::Output, + // A buffer to temporarily store the data read from the underlying body. + // Reused as much as possible to optimize allocations. + buf: BytesMut, read_all_data: bool, } } +impl WrapBody { + const INTERNAL_BUF_CAPACITY: usize = 4096; +} + impl WrapBody { #[allow(dead_code)] pub(crate) fn new(body: B, quality: CompressionLevel) -> Self @@ -167,6 +174,7 @@ impl WrapBody { Self { read, + buf: BytesMut::with_capacity(Self::INTERNAL_BUF_CAPACITY), read_all_data: false, } } @@ -186,16 +194,21 @@ where cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { let mut this = self.project(); - let mut buf = BytesMut::new(); + if !*this.read_all_data { - let result = tokio_util::io::poll_read_buf(this.read.as_mut(), cx, &mut buf); + if this.buf.capacity() == 0 { + this.buf.reserve(Self::INTERNAL_BUF_CAPACITY); + } + + let result = tokio_util::io::poll_read_buf(this.read.as_mut(), cx, &mut this.buf); match ready!(result) { Ok(0) => { *this.read_all_data = true; } Ok(_) => { - return Poll::Ready(Some(Ok(Frame::data(buf.freeze())))); + let chunk = this.buf.split().freeze(); + return Poll::Ready(Some(Ok(Frame::data(chunk)))); } Err(err) => { let body_error: Option = M::get_pin_mut(this.read)