-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
parquet reader: Support reading decimals from parquet BYTE_ARRAY
type
#2160
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -25,19 +25,11 @@ use crate::arrow::array_reader::{ | |||||
ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, | ||||||
PrimitiveArrayReader, RowGroupCollection, StructArrayReader, | ||||||
}; | ||||||
use crate::arrow::buffer::converter::{ | ||||||
DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter, | ||||||
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, | ||||||
IntervalDayTimeArrayConverter, IntervalDayTimeConverter, | ||||||
IntervalYearMonthArrayConverter, IntervalYearMonthConverter, | ||||||
}; | ||||||
use crate::arrow::buffer::converter::{DecimalArrayConverter, DecimalByteArrayConvert, DecimalFixedLengthByteArrayConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter, IntervalYearMonthArrayConverter, IntervalYearMonthConverter}; | ||||||
use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType}; | ||||||
use crate::arrow::ProjectionMask; | ||||||
use crate::basic::Type as PhysicalType; | ||||||
use crate::data_type::{ | ||||||
BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, | ||||||
Int96Type, | ||||||
}; | ||||||
use crate::data_type::{BoolType, ByteArrayType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type}; | ||||||
use crate::errors::Result; | ||||||
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type}; | ||||||
|
||||||
|
@@ -233,6 +225,18 @@ fn build_primitive_reader( | |||||
column_desc, | ||||||
arrow_type, | ||||||
), | ||||||
Some(DataType::Decimal(precision, scale)) => { | ||||||
// read decimal data from parquet binary physical type | ||||||
let convert = DecimalByteArrayConvert::new(DecimalArrayConverter::new(precision as i32, scale as i32)); | ||||||
Ok(Box::new( | ||||||
ComplexObjectArrayReader::<ByteArrayType,DecimalByteArrayConvert>::new( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code has been formatted. |
||||||
page_iterator, | ||||||
column_desc, | ||||||
convert, | ||||||
arrow_type | ||||||
)? | ||||||
)) | ||||||
}, | ||||||
_ => make_byte_array_reader( | ||||||
page_iterator, | ||||||
column_desc, | ||||||
|
@@ -241,13 +245,13 @@ fn build_primitive_reader( | |||||
}, | ||||||
PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type { | ||||||
DataType::Decimal(precision, scale) => { | ||||||
let converter = DecimalConverter::new(DecimalArrayConverter::new( | ||||||
let converter = DecimalFixedLengthByteArrayConverter::new(DecimalArrayConverter::new( | ||||||
precision as i32, | ||||||
scale as i32, | ||||||
)); | ||||||
Ok(Box::new(ComplexObjectArrayReader::< | ||||||
FixedLenByteArrayType, | ||||||
DecimalConverter, | ||||||
DecimalFixedLengthByteArrayConverter, | ||||||
>::new( | ||||||
page_iterator, | ||||||
column_desc, | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,16 +76,6 @@ impl DecimalArrayConverter { | |
pub fn new(precision: i32, scale: i32) -> Self { | ||
Self { precision, scale } | ||
} | ||
|
||
fn from_bytes_to_i128(b: &[u8]) -> i128 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move this method out and as a public method for this mod |
||
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); | ||
let first_bit = b[0] & 128u8 == 128u8; | ||
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; | ||
for (i, v) in b.iter().enumerate() { | ||
result[i + (16 - b.len())] = *v; | ||
} | ||
i128::from_be_bytes(result) | ||
} | ||
} | ||
|
||
impl Converter<Vec<Option<FixedLenByteArray>>, Decimal128Array> | ||
|
@@ -94,13 +84,41 @@ impl Converter<Vec<Option<FixedLenByteArray>>, Decimal128Array> | |
fn convert(&self, source: Vec<Option<FixedLenByteArray>>) -> Result<Decimal128Array> { | ||
let array = source | ||
.into_iter() | ||
.map(|array| array.map(|array| Self::from_bytes_to_i128(array.data()))) | ||
.map(|array| array.map(|array| from_bytes_to_i128(array.data()))) | ||
.collect::<Decimal128Array>() | ||
.with_precision_and_scale(self.precision as usize, self.scale as usize)?; | ||
|
||
Ok(array) | ||
} | ||
} | ||
|
||
impl Converter<Vec<Option<ByteArray>>, Decimal128Array> for DecimalArrayConverter { | ||
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<Decimal128Array> { | ||
let array = source | ||
.into_iter() | ||
.map(|array| array.map(|array| from_bytes_to_i128(array.data()))) | ||
.collect::<Decimal128Array>() | ||
.with_precision_and_scale(self.precision as usize, self.scale as usize)?; | ||
|
||
Ok(array) | ||
} | ||
} | ||
|
||
// Convert the bytes array to i128. | ||
// The endian of the input bytes array must be big-endian. | ||
fn from_bytes_to_i128(b: &[u8]) -> i128 { | ||
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); | ||
let first_bit = b[0] & 128u8 == 128u8; | ||
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; | ||
for (i, v) in b.iter().enumerate() { | ||
result[i + (16 - b.len())] = *v; | ||
} | ||
// The bytes array are from parquet file and must be the big-endian. | ||
// The endian is defined by parquet format, and the reference document | ||
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 | ||
i128::from_be_bytes(result) | ||
} | ||
|
||
/// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval, | ||
/// and interprets it as an i32 value representing the Arrow YearMonth value | ||
pub struct IntervalYearMonthArrayConverter {} | ||
|
@@ -272,12 +290,15 @@ pub type IntervalDayTimeConverter = ArrayRefConverter< | |
IntervalDayTimeArrayConverter, | ||
>; | ||
|
||
pub type DecimalConverter = ArrayRefConverter< | ||
pub type DecimalFixedLengthByteArrayConverter = ArrayRefConverter< | ||
Vec<Option<FixedLenByteArray>>, | ||
Decimal128Array, | ||
DecimalArrayConverter, | ||
>; | ||
|
||
pub type DecimalByteArrayConvert = | ||
ArrayRefConverter<Vec<Option<ByteArray>>, Decimal128Array, DecimalArrayConverter>; | ||
|
||
pub struct FromConverter<S, T> { | ||
_source: PhantomData<S>, | ||
_dest: PhantomData<T>, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -531,6 +531,32 @@ mod tests { | |
assert_eq!(&arrow_fields, converted_arrow_schema.fields()); | ||
} | ||
|
||
#[test] | ||
fn test_decimal_fields() { | ||
let message_type = " | ||
message test_schema { | ||
REQUIRED INT32 decimal1 (DECIMAL(4,2)); | ||
REQUIRED INT64 decimal2 (DECIMAL(12,2)); | ||
REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2)); | ||
REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2)); | ||
} | ||
"; | ||
|
||
let parquet_group_type = parse_message_type(message_type).unwrap(); | ||
|
||
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); | ||
let converted_arrow_schema = | ||
parquet_to_arrow_schema(&parquet_schema, None).unwrap(); | ||
|
||
let arrow_fields = vec![ | ||
Field::new("decimal1", DataType::Decimal(4,2), false), | ||
Field::new("decimal2", DataType::Decimal(12,2), false), | ||
Field::new("decimal3", DataType::Decimal(30,2), false), | ||
Field::new("decimal4", DataType::Decimal(33,2), false), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this would this have previously failed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, this has previously failed as I just ran this test to confirm.
|
||
]; | ||
assert_eq!(&arrow_fields, converted_arrow_schema.fields()); | ||
} | ||
|
||
#[test] | ||
fn test_byte_array_fields() { | ||
let message_type = " | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the core change?
(namely to support reading arrow
DataType::Decimal
from parquetBYTE_ARRAY
type where previously the code only supportedDataType::Decimal
in `FIXED_LEN_BYTE_ARRAY fields)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
previously, decimal just can be read from
int32,int64,byte_array
, but in the definition of parquet decimal can be read fromint32,int64,byte_array,fixed_length_byte_array
.