Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: don't truncate f16/decimal min/max stats #5154

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 124 additions & 29 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,32 +680,28 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));

// We only truncate if the data is represented as binary
match self.descr.physical_type() {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
self.column_index_builder.append(
null_page,
self.truncate_min_value(
self.props.column_index_truncate_length(),
stat.min_bytes(),
)
.0,
self.truncate_max_value(
self.props.column_index_truncate_length(),
stat.max_bytes(),
)
.0,
self.page_metrics.num_page_nulls as i64,
);
}
_ => {
self.column_index_builder.append(
null_page,
stat.min_bytes().to_vec(),
stat.max_bytes().to_vec(),
self.page_metrics.num_page_nulls as i64,
);
}
if self.can_truncate_value() {
self.column_index_builder.append(
null_page,
self.truncate_min_value(
self.props.column_index_truncate_length(),
stat.min_bytes(),
)
.0,
self.truncate_max_value(
self.props.column_index_truncate_length(),
stat.max_bytes(),
)
.0,
self.page_metrics.num_page_nulls as i64,
);
} else {
self.column_index_builder.append(
null_page,
stat.min_bytes().to_vec(),
stat.max_bytes().to_vec(),
self.page_metrics.num_page_nulls as i64,
);
}
}
}
Expand All @@ -715,6 +711,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.append_row_count(self.page_metrics.num_buffered_rows as i64);
}

/// Determine if we should allow truncating min/max values for this column's statistics
fn can_truncate_value(&self) -> bool {
match self.descr.physical_type() {
// Don't truncate for Float16 and Decimal because their sort order is different
// from that of FIXED_LEN_BYTE_ARRAY sort order.
// So truncation of those types could lead to inaccurate min/max statistics
Type::FIXED_LEN_BYTE_ARRAY
if !matches!(
self.descr.logical_type(),
Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
) =>
{
true
}
Type::BYTE_ARRAY => true,
// Truncation only applies for fba/binary physical types
_ => false,
}
}

fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
truncation_length
.filter(|l| data.len() > *l)
Expand Down Expand Up @@ -948,7 +964,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.with_min_is_exact(!did_truncate_min),
)
}
Statistics::FixedLenByteArray(stats) if stats.has_min_max_set() => {
Statistics::FixedLenByteArray(stats)
if (stats.has_min_max_set() && self.can_truncate_value()) =>
{
Comment on lines +967 to +969
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about in write_column_metadata?

This covers the truncation in write_column_metadata(..)

let (min, did_truncate_min) = self.truncate_min_value(
self.props.statistics_truncate_length(),
stats.min_bytes(),
Expand Down Expand Up @@ -2713,6 +2731,82 @@ mod tests {
}
}

#[test]
fn test_float16_min_max_no_truncation() {
// Even if we set truncation to occur at 1 byte, we should not truncate for Float16
let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
let props = Arc::new(builder.build());
let page_writer = get_test_page_writer();
let mut writer = get_test_float16_column_writer(page_writer, props);

let expected_value = f16::PI.to_le_bytes().to_vec();
let data = vec![ByteArray::from(expected_value.clone()).into()];
writer.write_batch(&data, None, None).unwrap();
writer.flush_data_pages().unwrap();

let r = writer.close().unwrap();

// stats should still be written
// ensure bytes weren't truncated for column index
let column_index = r.column_index.unwrap();
let column_index_min_bytes = column_index.min_values[0].as_slice();
let column_index_max_bytes = column_index.max_values[0].as_slice();
assert_eq!(expected_value, column_index_min_bytes);
assert_eq!(expected_value, column_index_max_bytes);

// ensure bytes weren't truncated for statistics
let stats = r.metadata.statistics().unwrap();
assert!(stats.has_min_max_set());
if let Statistics::FixedLenByteArray(stats) = stats {
let stats_min_bytes = stats.min_bytes();
let stats_max_bytes = stats.max_bytes();
assert_eq!(expected_value, stats_min_bytes);
assert_eq!(expected_value, stats_max_bytes);
} else {
panic!("expecting Statistics::FixedLenByteArray");
}
}

#[test]
fn test_decimal_min_max_no_truncation() {
// Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
let props = Arc::new(builder.build());
let page_writer = get_test_page_writer();
let mut writer =
get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);

let expected_value = vec![
255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
231u8, 90u8, 0u8, 0u8,
];
let data = vec![ByteArray::from(expected_value.clone()).into()];
writer.write_batch(&data, None, None).unwrap();
writer.flush_data_pages().unwrap();

let r = writer.close().unwrap();

// stats should still be written
// ensure bytes weren't truncated for column index
let column_index = r.column_index.unwrap();
let column_index_min_bytes = column_index.min_values[0].as_slice();
let column_index_max_bytes = column_index.max_values[0].as_slice();
assert_eq!(expected_value, column_index_min_bytes);
assert_eq!(expected_value, column_index_max_bytes);

// ensure bytes weren't truncated for statistics
let stats = r.metadata.statistics().unwrap();
assert!(stats.has_min_max_set());
if let Statistics::FixedLenByteArray(stats) = stats {
let stats_min_bytes = stats.min_bytes();
let stats_max_bytes = stats.max_bytes();
assert_eq!(expected_value, stats_min_bytes);
assert_eq!(expected_value, stats_max_bytes);
} else {
panic!("expecting Statistics::FixedLenByteArray");
}
}

#[test]
fn test_statistics_truncating_byte_array() {
let page_writer = get_test_page_writer();
Expand Down Expand Up @@ -3421,7 +3515,7 @@ mod tests {
values: &[FixedLenByteArray],
) -> ValueStatistics<FixedLenByteArray> {
let page_writer = get_test_page_writer();
let mut writer = get_test_float16_column_writer(page_writer);
let mut writer = get_test_float16_column_writer(page_writer, Default::default());
writer.write_batch(values, None, None).unwrap();

let metadata = writer.close().unwrap().metadata;
Expand All @@ -3434,9 +3528,10 @@ mod tests {

fn get_test_float16_column_writer(
page_writer: Box<dyn PageWriter>,
props: WriterPropertiesPtr,
) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
let descr = Arc::new(get_test_float16_column_descr(0, 0));
let column_writer = get_column_writer(descr, Default::default(), page_writer);
let column_writer = get_column_writer(descr, props, page_writer);
get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
}

Expand Down
Loading