Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet reader: Support reading decimals from parquet BYTE_ARRAY type #2160

Merged
merged 1 commit into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,11 @@ use crate::arrow::array_reader::{
ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
};
use crate::arrow::buffer::converter::{
DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
use crate::arrow::buffer::converter::{DecimalArrayConverter, DecimalByteArrayConvert, DecimalFixedLengthByteArrayConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter, IntervalYearMonthArrayConverter, IntervalYearMonthConverter};
use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
use crate::basic::Type as PhysicalType;
use crate::data_type::{
BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type,
Int96Type,
};
use crate::data_type::{BoolType, ByteArrayType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type};
use crate::errors::Result;
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};

Expand Down Expand Up @@ -233,6 +225,18 @@ fn build_primitive_reader(
column_desc,
arrow_type,
),
Some(DataType::Decimal(precision, scale)) => {
// read decimal data from parquet binary physical type
let convert = DecimalByteArrayConvert::new(DecimalArrayConverter::new(precision as i32, scale as i32));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the core change?

(namely to support reading arrow DataType::Decimal from parquet BYTE_ARRAY type where previously the code only supported DataType::Decimal in `FIXED_LEN_BYTE_ARRAY fields)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
previously, decimal just can be read from int32,int64,byte_array, but in the definition of parquet decimal can be read from int32,int64,byte_array,fixed_length_byte_array.

Ok(Box::new(
ComplexObjectArrayReader::<ByteArrayType,DecimalByteArrayConvert>::new(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style?

Suggested change
ComplexObjectArrayReader::<ByteArrayType,DecimalByteArrayConvert>::new(
ComplexObjectArrayReader::<ByteArrayType, DecimalByteArrayConvert>::new(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code has been formatted.

page_iterator,
column_desc,
convert,
arrow_type
)?
))
},
_ => make_byte_array_reader(
page_iterator,
column_desc,
Expand All @@ -241,13 +245,13 @@ fn build_primitive_reader(
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
DataType::Decimal(precision, scale) => {
let converter = DecimalConverter::new(DecimalArrayConverter::new(
let converter = DecimalFixedLengthByteArrayConverter::new(DecimalArrayConverter::new(
precision as i32,
scale as i32,
));
Ok(Box::new(ComplexObjectArrayReader::<
FixedLenByteArrayType,
DecimalConverter,
DecimalFixedLengthByteArrayConverter,
>::new(
page_iterator,
column_desc,
Expand Down
7 changes: 6 additions & 1 deletion parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,12 @@ mod tests {
fn test_read_decimal_file() {
use arrow::array::Decimal128Array;
let testdata = arrow::util::test_util::parquet_test_data();
let file_variants = vec![("fixed_length", 25), ("int32", 4), ("int64", 10)];
let file_variants = vec![
("byte_array", 4),
("fixed_length", 25),
("int32", 4),
("int64", 10),
];
for (prefix, target_precision) in file_variants {
let path = format!("{}/{}_decimal.parquet", testdata, prefix);
let file = File::open(&path).unwrap();
Expand Down
45 changes: 33 additions & 12 deletions parquet/src/arrow/buffer/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,6 @@ impl DecimalArrayConverter {
pub fn new(precision: i32, scale: i32) -> Self {
Self { precision, scale }
}

fn from_bytes_to_i128(b: &[u8]) -> i128 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this method out and as a public method for this mod

assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
i128::from_be_bytes(result)
}
}

impl Converter<Vec<Option<FixedLenByteArray>>, Decimal128Array>
Expand All @@ -94,13 +84,41 @@ impl Converter<Vec<Option<FixedLenByteArray>>, Decimal128Array>
fn convert(&self, source: Vec<Option<FixedLenByteArray>>) -> Result<Decimal128Array> {
let array = source
.into_iter()
.map(|array| array.map(|array| Self::from_bytes_to_i128(array.data())))
.map(|array| array.map(|array| from_bytes_to_i128(array.data())))
.collect::<Decimal128Array>()
.with_precision_and_scale(self.precision as usize, self.scale as usize)?;

Ok(array)
}
}

impl Converter<Vec<Option<ByteArray>>, Decimal128Array> for DecimalArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<Decimal128Array> {
let array = source
.into_iter()
.map(|array| array.map(|array| from_bytes_to_i128(array.data())))
.collect::<Decimal128Array>()
.with_precision_and_scale(self.precision as usize, self.scale as usize)?;

Ok(array)
}
}

// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(result)
}

/// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval,
/// and interprets it as an i32 value representing the Arrow YearMonth value
pub struct IntervalYearMonthArrayConverter {}
Expand Down Expand Up @@ -272,12 +290,15 @@ pub type IntervalDayTimeConverter = ArrayRefConverter<
IntervalDayTimeArrayConverter,
>;

pub type DecimalConverter = ArrayRefConverter<
pub type DecimalFixedLengthByteArrayConverter = ArrayRefConverter<
Vec<Option<FixedLenByteArray>>,
Decimal128Array,
DecimalArrayConverter,
>;

pub type DecimalByteArrayConvert =
ArrayRefConverter<Vec<Option<ByteArray>>, Decimal128Array, DecimalArrayConverter>;

pub struct FromConverter<S, T> {
_source: PhantomData<S>,
_dest: PhantomData<T>,
Expand Down
26 changes: 26 additions & 0 deletions parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,32 @@ mod tests {
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}

#[test]
fn test_decimal_fields() {
let message_type = "
message test_schema {
REQUIRED INT32 decimal1 (DECIMAL(4,2));
REQUIRED INT64 decimal2 (DECIMAL(12,2));
REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
}
";

let parquet_group_type = parse_message_type(message_type).unwrap();

let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema(&parquet_schema, None).unwrap();

let arrow_fields = vec![
Field::new("decimal1", DataType::Decimal(4,2), false),
Field::new("decimal2", DataType::Decimal(12,2), false),
Field::new("decimal3", DataType::Decimal(30,2), false),
Field::new("decimal4", DataType::Decimal(33,2), false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would this have previously failed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this has previously failed as I just ran this test to confirm.

---- arrow::schema::tests::test_decimal_fields stdout ----
thread 'arrow::schema::tests::test_decimal_fields' panicked at 'called `Result::unwrap()` on an `Err` value: ArrowError("Unable to convert parquet BYTE_ARRAY logical type Some(Decimal { scale: 2, precision: 33 }) or converted type DECIMAL")', parquet/src/arrow/schema.rs:1734:60

];
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}

#[test]
fn test_byte_array_fields() {
let message_type = "
Expand Down
6 changes: 4 additions & 2 deletions parquet/src/arrow/schema/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn from_parquet(parquet_type: &Type) -> Result<DataType> {
PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
PhysicalType::FLOAT => Ok(DataType::Float32),
PhysicalType::DOUBLE => Ok(DataType::Float64),
PhysicalType::BYTE_ARRAY => from_byte_array(basic_info),
PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale),
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
}
Expand Down Expand Up @@ -224,7 +224,7 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataTy
}
}

fn from_byte_array(info: &BasicTypeInfo) -> Result<DataType> {
fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32 ) -> Result<DataType> {
match (info.logical_type(), info.converted_type()) {
(Some(LogicalType::String), _) => Ok(DataType::Utf8),
(Some(LogicalType::Json), _) => Ok(DataType::Binary),
Expand All @@ -235,6 +235,8 @@ fn from_byte_array(info: &BasicTypeInfo) -> Result<DataType> {
(None, ConvertedType::BSON) => Ok(DataType::Binary),
(None, ConvertedType::ENUM) => Ok(DataType::Binary),
(None, ConvertedType::UTF8) => Ok(DataType::Utf8),
(Some(LogicalType::Decimal {precision, scale}), _) => Ok(DataType::Decimal(precision as usize, scale as usize)),
(None, ConvertedType::DECIMAL) => Ok(DataType::Decimal(precision as usize, scale as usize)),
(logical, converted) => Err(arrow_err!(
"Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
logical,
Expand Down
16 changes: 16 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ mod tests {
assert_eq!(ConvertedType::JSON.to_string(), "JSON");
assert_eq!(ConvertedType::BSON.to_string(), "BSON");
assert_eq!(ConvertedType::INTERVAL.to_string(), "INTERVAL");
assert_eq!(ConvertedType::DECIMAL.to_string(), "DECIMAL")
}

#[test]
Expand Down Expand Up @@ -1153,6 +1154,10 @@ mod tests {
ConvertedType::from(Some(parquet::ConvertedType::Interval)),
ConvertedType::INTERVAL
);
assert_eq!(
ConvertedType::from(Some(parquet::ConvertedType::Decimal)),
ConvertedType::DECIMAL
)
}

#[test]
Expand Down Expand Up @@ -1244,6 +1249,10 @@ mod tests {
Some(parquet::ConvertedType::Interval),
ConvertedType::INTERVAL.into()
);
assert_eq!(
Some(parquet::ConvertedType::Decimal),
ConvertedType::DECIMAL.into()
)
}

#[test]
Expand Down Expand Up @@ -1409,6 +1418,13 @@ mod tests {
.unwrap(),
ConvertedType::INTERVAL
);
assert_eq!(
ConvertedType::DECIMAL
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::DECIMAL
)
}

#[test]
Expand Down