Skip to content

Commit

Permalink
Pass decompressed size to parquet Codec::decompress (apache#2956)
Browse files Browse the repository at this point in the history
Added optional argument uncompressed_size to Coded::decompress to do a better
estimation of the required uncompress size.

* snappy: Probably no much improvement as `decompress_len` is already accurate.
* gzip: No improvement. Ignores the size hint.
* brotli: Probably no much improvement. The buffer size will be equal to the uncompressed_size size.
* lz4: No improvement. As the buffer is located at the stack there are no extra allocations. Then it probably is better to keep it working as it is.
* zstd: No improvement. Ignores the size hint.
* lz4_raw: Improvement. The estimation method over-estimates, so knowin the uncompressed size reduces allocations.
  • Loading branch information
Adrián Gallego Castellanos committed Oct 27, 2022
1 parent 4e1247e commit 0ffec74
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 19 deletions.
67 changes: 49 additions & 18 deletions parquet/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ let mut compressed = vec![];
codec.compress(&data[..], &mut compressed).unwrap();
let mut output = vec![];
codec.decompress(&compressed[..], &mut output).unwrap();
codec.decompress(&compressed[..], &mut output, None).unwrap();
assert_eq!(output, data);
```
Expand All @@ -57,9 +57,18 @@ pub trait Codec: Send {
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;

/// Decompresses data stored in slice `input_buf` and appends output to `output_buf`.
///
/// If the uncompress_size is provided it will allocate the exact amount of memory.
/// Otherwise, it will estimate the uncompressed size, allocating an amount of memory
/// greater or equal to the real uncompress_size.
///
/// Returns the total number of bytes written.
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>)
-> Result<usize>;
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
uncompress_size: Option<usize>,
) -> Result<usize>;
}

/// Given the compression type `codec`, returns a codec used to compress and decompress
Expand Down Expand Up @@ -112,8 +121,12 @@ mod snappy_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
uncompress_size: Option<usize>,
) -> Result<usize> {
let len = decompress_len(input_buf)?;
let len = match uncompress_size {
Some(size) => size,
None => decompress_len(input_buf)?,
};
let offset = output_buf.len();
output_buf.resize(offset + len, 0);
self.decoder
Expand Down Expand Up @@ -161,6 +174,7 @@ mod gzip_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
_uncompress_size: Option<usize>,
) -> Result<usize> {
let mut decoder = read::GzDecoder::new(input_buf);
decoder.read_to_end(output_buf).map_err(|e| e.into())
Expand Down Expand Up @@ -203,8 +217,10 @@ mod brotli_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
uncompress_size: Option<usize>,
) -> Result<usize> {
brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
let buffer_size = uncompress_size.unwrap_or(BROTLI_DEFAULT_BUFFER_SIZE);
brotli::Decompressor::new(input_buf, buffer_size)
.read_to_end(output_buf)
.map_err(|e| e.into())
}
Expand Down Expand Up @@ -248,6 +264,7 @@ mod lz4_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
_uncompress_size: Option<usize>,
) -> Result<usize> {
let mut decoder = lz4::Decoder::new(input_buf)?;
let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
Expand Down Expand Up @@ -306,6 +323,7 @@ mod zstd_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
_uncompress_size: Option<usize>,
) -> Result<usize> {
let mut decoder = zstd::Decoder::new(input_buf)?;
match io::copy(&mut decoder, output_buf) {
Expand Down Expand Up @@ -353,16 +371,23 @@ mod lz4_raw_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
uncompress_size: Option<usize>,
) -> Result<usize> {
let offset = output_buf.len();
let required_len = max_uncompressed_size(input_buf.len());
let required_len =
uncompress_size.unwrap_or_else(|| max_uncompressed_size(input_buf.len()));
output_buf.resize(offset + required_len, 0);
let required_len: i32 = required_len.try_into().unwrap();
match lz4::block::decompress_to_buffer(input_buf, Some(required_len), &mut output_buf[offset..]) {
match lz4::block::decompress_to_buffer(
input_buf,
Some(required_len.try_into().unwrap()),
&mut output_buf[offset..],
) {
Ok(n) => {
output_buf.truncate(offset + n);
Ok(n)
},
if n < required_len {
output_buf.truncate(offset + n);
}
Ok(n)
}
Err(e) => Err(e.into()),
}
}
Expand All @@ -371,11 +396,16 @@ mod lz4_raw_codec {
let offset = output_buf.len();
let required_len = lz4::block::compress_bound(input_buf.len())?;
output_buf.resize(offset + required_len, 0);
match lz4::block::compress_to_buffer(input_buf, None, false, &mut output_buf[offset..]) {
match lz4::block::compress_to_buffer(
input_buf,
None,
false,
&mut output_buf[offset..],
) {
Ok(n) => {
output_buf.truncate(offset + n);
Ok(())
},
}
Err(e) => Err(e.into()),
}
}
Expand All @@ -390,7 +420,7 @@ mod tests {

use crate::util::test_common::rand_gen::random_bytes;

fn test_roundtrip(c: CodecType, data: &[u8]) {
fn test_roundtrip(c: CodecType, data: &[u8], uncompress_size: Option<usize>) {
let mut c1 = create_codec(c).unwrap().unwrap();
let mut c2 = create_codec(c).unwrap().unwrap();

Expand All @@ -402,7 +432,7 @@ mod tests {

// Decompress with c2
let decompressed_size = c2
.decompress(compressed.as_slice(), &mut decompressed)
.decompress(compressed.as_slice(), &mut decompressed, uncompress_size)
.expect("Error when decompressing");
assert_eq!(data.len(), decompressed_size);
assert_eq!(data, decompressed.as_slice());
Expand All @@ -416,7 +446,7 @@ mod tests {

// Decompress with c1
let decompressed_size = c1
.decompress(compressed.as_slice(), &mut decompressed)
.decompress(compressed.as_slice(), &mut decompressed, uncompress_size)
.expect("Error when decompressing");
assert_eq!(data.len(), decompressed_size);
assert_eq!(data, decompressed.as_slice());
Expand All @@ -435,7 +465,7 @@ mod tests {
assert_eq!(&compressed[..4], prefix);

let decompressed_size = c2
.decompress(&compressed[4..], &mut decompressed)
.decompress(&compressed[4..], &mut decompressed, uncompress_size)
.expect("Error when decompressing");

assert_eq!(data.len(), decompressed_size);
Expand All @@ -447,7 +477,8 @@ mod tests {
let sizes = vec![100, 10000, 100000];
for size in sizes {
let data = random_bytes(size);
test_roundtrip(c, &data);
test_roundtrip(c, &data, None);
test_roundtrip(c, &data, Some(data.len()));
}
}

Expand Down
6 changes: 5 additions & 1 deletion parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,11 @@ pub(crate) fn decode_page(
let mut decompressed = Vec::with_capacity(uncompressed_size);
let compressed = &buffer.as_ref()[offset..];
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
decompressor.decompress(compressed, &mut decompressed)?;
decompressor.decompress(
compressed,
&mut decompressed,
Some(uncompressed_size),
)?;

if decompressed.len() != uncompressed_size {
return Err(general_err!(
Expand Down

0 comments on commit 0ffec74

Please sign in to comment.