Skip to content

Commit

Permalink
Restrict Decoder to compatible types (#1276) (#1277)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Feb 7, 2022
1 parent ef8875b commit 3e7b6c4
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 17 deletions.
1 change: 1 addition & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ pub(crate) mod private {
+ super::SliceAsBytes
+ PartialOrd
+ Send
+ crate::encodings::decoding::private::GetDecoder
{
/// Encode the value directly from a higher level encoder
fn encode<W: std::io::Write>(
Expand Down
142 changes: 125 additions & 17 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{cmp, marker::PhantomData, mem};
use super::rle::RleDecoder;

use crate::basic::*;
use crate::data_type::private::*;
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
Expand All @@ -31,6 +31,111 @@ use crate::util::{
memory::{ByteBuffer, ByteBufferPtr},
};

pub(crate) mod private {
use super::*;

/// A trait that allows getting a [`Decoder`] implementation for a [`DataType`] with
/// the corresponding [`ParquetValueType`]. This is necessary to support
/// [`Decoder`] implementations that may not be applicable for all [`DataType`]
/// and by extension all [`ParquetValueType`]
pub trait GetDecoder {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
get_decoder_default(descr, encoding)
}
}

fn get_decoder_default<T: DataType>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
"Cannot initialize this encoding through this function"
)),
Encoding::RLE
| Encoding::DELTA_BINARY_PACKED
| Encoding::DELTA_BYTE_ARRAY
| Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
"Encoding {} is not supported for type",
encoding
)),
e => Err(nyi_err!("Encoding {} is not supported", e)),
}
}

impl GetDecoder for bool {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for i32 {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for i64 {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for f32 {}
impl GetDecoder for f64 {}

impl GetDecoder for ByteArray {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
Encoding::DELTA_LENGTH_BYTE_ARRAY => {
Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
}
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for FixedLenByteArray {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for Int96 {}
}

// ----------------------------------------------------------------------
// Decoders

Expand Down Expand Up @@ -109,20 +214,8 @@ pub fn get_decoder<T: DataType>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
let decoder: Box<dyn Decoder<T>> = match encoding {
Encoding::PLAIN => Box::new(PlainDecoder::new(descr.type_length())),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
return Err(general_err!(
"Cannot initialize this encoding through this function"
));
}
Encoding::RLE => Box::new(RleValueDecoder::new()),
Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
e => return Err(nyi_err!("Encoding {} is not supported", e)),
};
Ok(decoder)
use self::private::GetDecoder;
T::T::get_decoder(descr, encoding)
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -817,8 +910,11 @@ mod tests {
// supported encodings
create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
create_and_check_decoder::<Int32Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
create_and_check_decoder::<Int32Type>(Encoding::DELTA_BYTE_ARRAY, None);
create_and_check_decoder::<ByteArrayType>(
Encoding::DELTA_LENGTH_BYTE_ARRAY,
None,
);
create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
create_and_check_decoder::<BoolType>(Encoding::RLE, None);

// error when initializing
Expand All @@ -834,6 +930,18 @@ mod tests {
"Cannot initialize this encoding through this function"
)),
);
create_and_check_decoder::<Int32Type>(
Encoding::DELTA_LENGTH_BYTE_ARRAY,
Some(general_err!(
"Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
)),
);
create_and_check_decoder::<Int32Type>(
Encoding::DELTA_BYTE_ARRAY,
Some(general_err!(
"Encoding DELTA_BYTE_ARRAY is not supported for type"
)),
);

// unsupported
create_and_check_decoder::<Int32Type>(
Expand Down

0 comments on commit 3e7b6c4

Please sign in to comment.