Skip to content

Commit

Permalink
Support zstd compression levels
Browse files Browse the repository at this point in the history
  • Loading branch information
spebern committed Mar 11, 2023
1 parent 9ce0ebb commit 55e5c21
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 18 deletions.
16 changes: 10 additions & 6 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::{fmt, str};

use crate::compression::ZstdLevel;
use crate::format as parquet;

use crate::errors::{ParquetError, Result};
Expand Down Expand Up @@ -290,7 +291,7 @@ pub enum Compression {
LZO,
BROTLI,
LZ4,
ZSTD,
ZSTD(ZstdLevel),
LZ4_RAW,
}

Expand Down Expand Up @@ -834,7 +835,7 @@ impl TryFrom<parquet::CompressionCodec> for Compression {
parquet::CompressionCodec::LZO => Compression::LZO,
parquet::CompressionCodec::BROTLI => Compression::BROTLI,
parquet::CompressionCodec::LZ4 => Compression::LZ4,
parquet::CompressionCodec::ZSTD => Compression::ZSTD,
parquet::CompressionCodec::ZSTD => Compression::ZSTD(Default::default()),
parquet::CompressionCodec::LZ4_RAW => Compression::LZ4_RAW,
_ => {
return Err(general_err!(
Expand All @@ -855,7 +856,7 @@ impl From<Compression> for parquet::CompressionCodec {
Compression::LZO => parquet::CompressionCodec::LZO,
Compression::BROTLI => parquet::CompressionCodec::BROTLI,
Compression::LZ4 => parquet::CompressionCodec::LZ4,
Compression::ZSTD => parquet::CompressionCodec::ZSTD,
Compression::ZSTD(_) => parquet::CompressionCodec::ZSTD,
Compression::LZ4_RAW => parquet::CompressionCodec::LZ4_RAW,
}
}
Expand Down Expand Up @@ -1787,7 +1788,7 @@ mod tests {
assert_eq!(Compression::LZO.to_string(), "LZO");
assert_eq!(Compression::BROTLI.to_string(), "BROTLI");
assert_eq!(Compression::LZ4.to_string(), "LZ4");
assert_eq!(Compression::ZSTD.to_string(), "ZSTD");
assert_eq!(Compression::ZSTD(Default::default()).to_string(), "ZSTD");
}

#[test]
Expand Down Expand Up @@ -1818,7 +1819,7 @@ mod tests {
);
assert_eq!(
Compression::try_from(parquet::CompressionCodec::ZSTD).unwrap(),
Compression::ZSTD
Compression::ZSTD(Default::default())
);
}

Expand All @@ -1839,7 +1840,10 @@ mod tests {
Compression::BROTLI.into()
);
assert_eq!(parquet::CompressionCodec::LZ4, Compression::LZ4.into());
assert_eq!(parquet::CompressionCodec::ZSTD, Compression::ZSTD.into());
assert_eq!(
parquet::CompressionCodec::ZSTD,
Compression::ZSTD(Default::default()).into()
);
}

#[test]
Expand Down
76 changes: 64 additions & 12 deletions parquet/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,26 @@ impl CodecOptionsBuilder {
}
}

/// Defines valid compression levels.
pub(crate) trait CompressionLevel<T: std::fmt::Display + std::cmp::PartialOrd> {
const MINIMUM_LEVEL: T;
const MAXIMUM_LEVEL: T;

/// Tests if the provided compression level is valid.
fn is_valid_level(level: T) -> Result<()> {
let compression_range = Self::MINIMUM_LEVEL..=Self::MAXIMUM_LEVEL;
if compression_range.contains(&level) {
Ok(())
} else {
Err(ParquetError::General(format!(
"valid compression range {}..={} exceeded.",
compression_range.start(),
compression_range.end()
)))
}
}
}

/// Given the compression type `codec`, returns a codec used to compress and decompress
/// bytes for the compression type.
/// This returns `None` if the codec type is `UNCOMPRESSED`.
Expand All @@ -140,7 +160,7 @@ pub fn create_codec(
_options.backward_compatible_lz4,
)))),
#[cfg(any(feature = "zstd", test))]
CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
CodecType::ZSTD(level) => Ok(Some(Box::new(ZSTDCodec::new(level)))),
#[cfg(any(feature = "lz4", test))]
CodecType::LZ4_RAW => Ok(Some(Box::new(LZ4RawCodec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
Expand Down Expand Up @@ -357,22 +377,21 @@ pub use lz4_codec::*;
mod zstd_codec {
use std::io::{self, Write};

use crate::compression::Codec;
use crate::compression::{Codec, ZstdLevel};
use crate::errors::Result;

/// Codec for Zstandard compression algorithm.
pub struct ZSTDCodec {}
pub struct ZSTDCodec {
level: ZstdLevel,
}

impl ZSTDCodec {
/// Creates new Zstandard compression codec.
pub(crate) fn new() -> Self {
Self {}
pub(crate) fn new(level: ZstdLevel) -> Self {
Self { level }
}
}

/// Compression level (1-21) for ZSTD. Choose 1 here for better compression speed.
const ZSTD_COMPRESSION_LEVEL: i32 = 1;

impl Codec for ZSTDCodec {
fn decompress(
&mut self,
Expand All @@ -388,7 +407,7 @@ mod zstd_codec {
}

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = zstd::Encoder::new(output_buf, ZSTD_COMPRESSION_LEVEL)?;
let mut encoder = zstd::Encoder::new(output_buf, self.level.0)?;
encoder.write_all(input_buf)?;
match encoder.finish() {
Ok(_) => Ok(()),
Expand All @@ -400,6 +419,38 @@ mod zstd_codec {
#[cfg(any(feature = "zstd", test))]
pub use zstd_codec::*;

/// Represents a valid zstd compression level.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct ZstdLevel(i32);

impl CompressionLevel<i32> for ZstdLevel {
// zstd binds to C, and hence zstd::compression_level_range() is not const as this calls the
// underlying C library.
const MINIMUM_LEVEL: i32 = 1;
const MAXIMUM_LEVEL: i32 = 22;
}

impl ZstdLevel {
/// Attempts to create a zstd compression level from a given compression level.
///
/// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`]).
pub fn try_new(level: i32) -> Result<Self> {
Self::is_valid_level(level).map(|_| Self(level))
}

/// Returns the compression level.
pub fn compression_level(&self) -> i32 {
self.0
}
}

#[cfg(feature = "zstd")]
impl Default for ZstdLevel {
fn default() -> Self {
Self(1)
}
}

#[cfg(any(feature = "lz4", test))]
mod lz4_raw_codec {
use crate::compression::Codec;
Expand Down Expand Up @@ -647,7 +698,8 @@ mod lz4_hadoop_codec {
let compressed_size = compressed_size as u32;
let uncompressed_size = input_buf.len() as u32;
output_buf[..SIZE_U32].copy_from_slice(&uncompressed_size.to_be_bytes());
output_buf[SIZE_U32..PREFIX_LEN].copy_from_slice(&compressed_size.to_be_bytes());
output_buf[SIZE_U32..PREFIX_LEN]
.copy_from_slice(&compressed_size.to_be_bytes());

Ok(())
}
Expand Down Expand Up @@ -759,8 +811,8 @@ mod tests {

#[test]
fn test_codec_zstd() {
test_codec_with_size(CodecType::ZSTD);
test_codec_without_size(CodecType::ZSTD);
test_codec_with_size(CodecType::ZSTD(Default::default()));
test_codec_without_size(CodecType::ZSTD(Default::default()));
}

#[test]
Expand Down

0 comments on commit 55e5c21

Please sign in to comment.