From e92af1e459f53e8c267b77fe4d5d58c03e4c9240 Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Fri, 1 Dec 2023 22:16:19 +1100 Subject: [PATCH 1/2] Parquet: don't truncate f16/decimal min/max stats --- parquet/src/column/writer/mod.rs | 148 +++++++++++++++++++++++++------ 1 file changed, 121 insertions(+), 27 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 14b8655091e4..59430b676200 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -677,32 +677,28 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone())); - // We only truncate if the data is represented as binary - match self.descr.physical_type() { - Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { - self.column_index_builder.append( - null_page, - self.truncate_min_value( - self.props.column_index_truncate_length(), - stat.min_bytes(), - ) - .0, - self.truncate_max_value( - self.props.column_index_truncate_length(), - stat.max_bytes(), - ) - .0, - self.page_metrics.num_page_nulls as i64, - ); - } - _ => { - self.column_index_builder.append( - null_page, - stat.min_bytes().to_vec(), - stat.max_bytes().to_vec(), - self.page_metrics.num_page_nulls as i64, - ); - } + if self.can_truncate_value() { + self.column_index_builder.append( + null_page, + self.truncate_min_value( + self.props.column_index_truncate_length(), + stat.min_bytes(), + ) + .0, + self.truncate_max_value( + self.props.column_index_truncate_length(), + stat.max_bytes(), + ) + .0, + self.page_metrics.num_page_nulls as i64, + ); + } else { + self.column_index_builder.append( + null_page, + stat.min_bytes().to_vec(), + stat.max_bytes().to_vec(), + self.page_metrics.num_page_nulls as i64, + ); } } } @@ -712,6 +708,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .append_row_count(self.page_metrics.num_buffered_rows as i64); } + /// Determine if we should allow truncating min/max values for this column's statistics + fn can_truncate_value(&self) -> bool { + match self.descr.physical_type() { + // Don't truncate for Float16 and Decimal because their sort order is different + // from that of FIXED_LEN_BYTE_ARRAY sort order. + // So truncation of those types could lead to inaccurate min/max statistics + Type::FIXED_LEN_BYTE_ARRAY + if !matches!( + self.descr.logical_type(), + Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16) + ) => + { + true + } + Type::BYTE_ARRAY => true, + // Truncation only applies for fba/binary physical types + _ => false, + } + } + fn truncate_min_value(&self, truncation_length: Option, data: &[u8]) -> (Vec, bool) { truncation_length .filter(|l| data.len() > *l) @@ -945,7 +961,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .with_min_is_exact(!did_truncate_min), ) } - Statistics::FixedLenByteArray(stats) if stats.has_min_max_set() => { + Statistics::FixedLenByteArray(stats) + if (stats.has_min_max_set() && self.can_truncate_value()) => + { let (min, did_truncate_min) = self.truncate_min_value( self.props.statistics_truncate_length(), stats.min_bytes(), @@ -2711,6 +2729,82 @@ mod tests { } } + #[test] + fn test_float16_min_max_no_truncation() { + // Even if we set truncation to occur at 1 byte, we should not truncate for Float16 + let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1)); + let props = Arc::new(builder.build()); + let page_writer = get_test_page_writer(); + let mut writer = get_test_float16_column_writer(page_writer, 0, 0, props); + + let expected_value = f16::PI.to_le_bytes().to_vec(); + let data = vec![ByteArray::from(expected_value.clone()).into()]; + writer.write_batch(&data, None, None).unwrap(); + writer.flush_data_pages().unwrap(); + + let r = writer.close().unwrap(); + + // stats should still be written + // ensure bytes weren't truncated for column index + let column_index = r.column_index.unwrap(); + let column_index_min_bytes = column_index.min_values[0].as_slice(); + let column_index_max_bytes = column_index.max_values[0].as_slice(); + assert_eq!(expected_value, column_index_min_bytes); + assert_eq!(expected_value, column_index_max_bytes); + + // ensure bytes weren't truncated for statistics + let stats = r.metadata.statistics().unwrap(); + assert!(stats.has_min_max_set()); + if let Statistics::FixedLenByteArray(stats) = stats { + let stats_min_bytes = stats.min_bytes(); + let stats_max_bytes = stats.max_bytes(); + assert_eq!(expected_value, stats_min_bytes); + assert_eq!(expected_value, stats_max_bytes); + } else { + panic!("expecting Statistics::FixedLenByteArray"); + } + } + + #[test] + fn test_decimal_min_max_no_truncation() { + // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal + let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1)); + let props = Arc::new(builder.build()); + let page_writer = get_test_page_writer(); + let mut writer = + get_test_decimals_column_writer::(page_writer, 0, 0, props); + + let expected_value = vec![ + 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8, + 231u8, 90u8, 0u8, 0u8, + ]; + let data = vec![ByteArray::from(expected_value.clone()).into()]; + writer.write_batch(&data, None, None).unwrap(); + writer.flush_data_pages().unwrap(); + + let r = writer.close().unwrap(); + + // stats should still be written + // ensure bytes weren't truncated for column index + let column_index = r.column_index.unwrap(); + let column_index_min_bytes = column_index.min_values[0].as_slice(); + let column_index_max_bytes = column_index.max_values[0].as_slice(); + assert_eq!(expected_value, column_index_min_bytes); + assert_eq!(expected_value, column_index_max_bytes); + + // ensure bytes weren't truncated for statistics + let stats = r.metadata.statistics().unwrap(); + assert!(stats.has_min_max_set()); + if let Statistics::FixedLenByteArray(stats) = stats { + let stats_min_bytes = stats.min_bytes(); + let stats_max_bytes = stats.max_bytes(); + assert_eq!(expected_value, stats_min_bytes); + assert_eq!(expected_value, stats_max_bytes); + } else { + panic!("expecting Statistics::FixedLenByteArray"); + } + } + #[test] fn test_statistics_truncating_byte_array() { let page_writer = get_test_page_writer(); From fceb475979f46400520dc35a31fc3c01d6cd8fb2 Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Fri, 1 Dec 2023 22:30:05 +1100 Subject: [PATCH 2/2] Fix --- parquet/src/column/writer/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 50b502f8c41c..5dd7747c6fc2 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -2737,7 +2737,7 @@ mod tests { let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1)); let props = Arc::new(builder.build()); let page_writer = get_test_page_writer(); - let mut writer = get_test_float16_column_writer(page_writer, 0, 0, props); + let mut writer = get_test_float16_column_writer(page_writer, props); let expected_value = f16::PI.to_le_bytes().to_vec(); let data = vec![ByteArray::from(expected_value.clone()).into()]; @@ -3515,7 +3515,7 @@ mod tests { values: &[FixedLenByteArray], ) -> ValueStatistics { let page_writer = get_test_page_writer(); - let mut writer = get_test_float16_column_writer(page_writer); + let mut writer = get_test_float16_column_writer(page_writer, Default::default()); writer.write_batch(values, None, None).unwrap(); let metadata = writer.close().unwrap().metadata; @@ -3528,9 +3528,10 @@ mod tests { fn get_test_float16_column_writer( page_writer: Box, + props: WriterPropertiesPtr, ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> { let descr = Arc::new(get_test_float16_column_descr(0, 0)); - let column_writer = get_column_writer(descr, Default::default(), page_writer); + let column_writer = get_column_writer(descr, props, page_writer); get_typed_column_writer::(column_writer) }