From 7b37fd47a37d0e4d92fde98f18e970db46146165 Mon Sep 17 00:00:00 2001 From: Matthew Kemp Date: Tue, 14 Nov 2023 10:36:55 +0000 Subject: [PATCH 1/6] changes needed to introduce min/max exactness --- parquet/src/column/page.rs | 50 ++++- parquet/src/column/writer/mod.rs | 4 + parquet/src/file/statistics.rs | 297 ++++++++++++++++++++++----- parquet/src/file/writer.rs | 30 ++- parquet/src/format.rs | 29 ++- parquet/tests/arrow_writer_layout.rs | 52 ++--- 6 files changed, 380 insertions(+), 82 deletions(-) diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 947a633f48a2..43284eccd1e5 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -376,7 +376,15 @@ mod tests { encoding: Encoding::PLAIN, def_level_encoding: Encoding::RLE, rep_level_encoding: Encoding::RLE, - statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)), + statistics: Some(Statistics::int32( + Some(1), + Some(2), + None, + 1, + true, + true, + true, + )), }; assert_eq!(data_page.page_type(), PageType::DATA_PAGE); assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice()); @@ -384,7 +392,15 @@ mod tests { assert_eq!(data_page.encoding(), Encoding::PLAIN); assert_eq!( data_page.statistics(), - Some(&Statistics::int32(Some(1), Some(2), None, 1, true)) + Some(&Statistics::int32( + Some(1), + Some(2), + None, + 1, + true, + true, + true + )) ); let data_page_v2 = Page::DataPageV2 { @@ -396,7 +412,15 @@ mod tests { def_levels_byte_len: 30, rep_levels_byte_len: 40, is_compressed: false, - statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)), + statistics: Some(Statistics::int32( + Some(1), + Some(2), + None, + 1, + true, + true, + true, + )), }; assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2); assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice()); @@ -404,7 +428,15 @@ mod tests { assert_eq!(data_page_v2.encoding(), Encoding::PLAIN); assert_eq!( data_page_v2.statistics(), - Some(&Statistics::int32(Some(1), Some(2), None, 1, true)) + Some(&Statistics::int32( + Some(1), + Some(2), + None, + 1, + true, + true, + true + )) ); let dict_page = Page::DictionaryPage { @@ -428,7 +460,15 @@ mod tests { encoding: Encoding::PLAIN, def_level_encoding: Encoding::RLE, rep_level_encoding: Encoding::RLE, - statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)), + statistics: Some(Statistics::int32( + Some(1), + Some(2), + None, + 1, + true, + true, + true, + )), }; let cpage = CompressedPage::new(data_page, 5); diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 60db90c5d46d..b190ffa21daf 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -700,6 +700,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { None, self.page_metrics.num_page_nulls, false, + true, + true, )) } _ => None, @@ -861,6 +863,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.column_metrics.column_distinct_count, self.column_metrics.num_column_nulls, false, + true, + true, ); // Some common readers only support the deprecated statistics diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index b36e37a80c97..26729afc03db 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -23,10 +23,12 @@ //! ```rust //! use parquet::file::statistics::Statistics; //! -//! let stats = Statistics::int32(Some(1), Some(10), None, 3, true); +//! let stats = Statistics::int32(Some(1), Some(10), None, 3, true, true, true); //! assert_eq!(stats.null_count(), 3); //! assert!(stats.has_min_max_set()); //! assert!(stats.is_min_max_deprecated()); +//! assert!(stats.min_is_exact()); +//! assert!(stats.max_is_exact()); //! //! match stats { //! Statistics::Int32(ref typed) => { @@ -88,6 +90,8 @@ macro_rules! statistics_new_func { distinct: Option, nulls: u64, is_deprecated: bool, + is_max_value_exact: bool, + is_min_value_exact: bool, ) -> Self { Statistics::$stat(ValueStatistics::new( min, @@ -95,6 +99,8 @@ macro_rules! statistics_new_func { distinct, nulls, is_deprecated, + is_max_value_exact, + is_min_value_exact, )) } }; @@ -152,6 +158,12 @@ pub fn from_thrift( stats.max_value }; + // Whether or not the min/max values are exact. Due to pre-existing truncation + // in other libraries such as parquet-mr, we can't assume that any given parquet file + // has exact statistics unless it's explicitly set. + let is_max_value_exact = stats.is_max_value_exact.unwrap_or(false); + let is_min_value_exact = stats.is_min_value_exact.unwrap_or(false); + // Values are encoded using PLAIN encoding definition, except that // variable-length byte arrays do not include a length prefix. // @@ -163,6 +175,8 @@ pub fn from_thrift( distinct_count, null_count, old_format, + is_max_value_exact, + is_min_value_exact, ), Type::INT32 => Statistics::int32( min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())), @@ -170,6 +184,8 @@ pub fn from_thrift( distinct_count, null_count, old_format, + is_max_value_exact, + is_min_value_exact, ), Type::INT64 => Statistics::int64( min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())), @@ -177,6 +193,8 @@ pub fn from_thrift( distinct_count, null_count, old_format, + is_max_value_exact, + is_min_value_exact, ), Type::INT96 => { // INT96 statistics may not be correct, because comparison is signed @@ -190,7 +208,15 @@ pub fn from_thrift( assert_eq!(data.len(), 12); from_le_slice::(&data) }); - Statistics::int96(min, max, distinct_count, null_count, old_format) + Statistics::int96( + min, + max, + distinct_count, + null_count, + old_format, + is_max_value_exact, + is_min_value_exact, + ) } Type::FLOAT => Statistics::float( min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), @@ -198,6 +224,8 @@ pub fn from_thrift( distinct_count, null_count, old_format, + is_max_value_exact, + is_min_value_exact, ), Type::DOUBLE => Statistics::double( min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), @@ -205,6 +233,8 @@ pub fn from_thrift( distinct_count, null_count, old_format, + is_max_value_exact, + is_min_value_exact, ), Type::BYTE_ARRAY => Statistics::byte_array( min.map(ByteArray::from), @@ -212,6 +242,8 @@ pub fn from_thrift( distinct_count, null_count, old_format, + is_max_value_exact, + is_min_value_exact, ), Type::FIXED_LEN_BYTE_ARRAY => Statistics::fixed_len_byte_array( min.map(ByteArray::from).map(FixedLenByteArray::from), @@ -219,6 +251,8 @@ pub fn from_thrift( distinct_count, null_count, old_format, + is_max_value_exact, + is_min_value_exact, ), }; @@ -243,16 +277,20 @@ pub fn to_thrift(stats: Option<&Statistics>) -> Option { distinct_count: stats.distinct_count().map(|value| value as i64), max_value: None, min_value: None, + is_max_value_exact: None, + is_min_value_exact: None, }; // Get min/max if set. - let (min, max) = if stats.has_min_max_set() { + let (min, max, min_exact, max_exact) = if stats.has_min_max_set() { ( Some(stats.min_bytes().to_vec()), Some(stats.max_bytes().to_vec()), + Some(stats.min_is_exact()), + Some(stats.max_is_exact()), ) } else { - (None, None) + (None, None, None, None) }; if stats.is_min_max_backwards_compatible() { @@ -266,6 +304,9 @@ pub fn to_thrift(stats: Option<&Statistics>) -> Option { thrift_stats.max_value = max; } + thrift_stats.is_min_value_exact = min_exact; + thrift_stats.is_max_value_exact = max_exact; + Some(thrift_stats) } @@ -295,6 +336,8 @@ impl Statistics { distinct_count: Option, null_count: u64, is_deprecated: bool, + is_max_value_exact: bool, + is_min_value_exact: bool, ) -> Self { Self::from(ValueStatistics::new( min, @@ -302,6 +345,8 @@ impl Statistics { distinct_count, null_count, is_deprecated, + is_min_value_exact, + is_max_value_exact, )) } @@ -372,6 +417,16 @@ impl Statistics { statistics_enum_func![self, has_min_max_set] } + /// Returns `true` if the min value is set, and is an exact min value. + pub fn min_is_exact(&self) -> bool { + statistics_enum_func![self, min_is_exact] + } + + /// Returns `true` if the max value is set, and is an exact max value. + pub fn max_is_exact(&self) -> bool { + statistics_enum_func![self, max_is_exact] + } + /// Returns slice of bytes that represent min value. /// Panics if min value is not set. pub fn min_bytes(&self) -> &[u8] { @@ -426,6 +481,10 @@ pub struct ValueStatistics { distinct_count: Option, null_count: u64, + // Whether or not the min or max values are exact, or truncated. + is_max_value_exact: bool, + is_min_value_exact: bool, + /// If `true` populate the deprecated `min` and `max` fields instead of /// `min_value` and `max_value` is_min_max_deprecated: bool, @@ -443,6 +502,8 @@ impl ValueStatistics { distinct_count: Option, null_count: u64, is_min_max_deprecated: bool, + is_max_value_exact: bool, + is_min_value_exact: bool, ) -> Self { Self { min, @@ -451,6 +512,8 @@ impl ValueStatistics { null_count, is_min_max_deprecated, is_min_max_backwards_compatible: is_min_max_deprecated, + is_max_value_exact, + is_min_value_exact, } } @@ -504,6 +567,16 @@ impl ValueStatistics { self.min.is_some() && self.max.is_some() } + /// Whether or not max value is set, and is an exact value. + pub fn max_is_exact(&self) -> bool { + self.max.is_some() && self.is_max_value_exact + } + + /// Whether or not min value is set, and is an exact value. + pub fn min_is_exact(&self) -> bool { + self.min.is_some() && self.is_min_value_exact + } + /// Returns optional value of number of distinct values occurring. fn distinct_count(&self) -> Option { self.distinct_count @@ -554,6 +627,8 @@ impl fmt::Display for ValueStatistics { } write!(f, ", null_count: {}", self.null_count)?; write!(f, ", min_max_deprecated: {}", self.is_min_max_deprecated)?; + write!(f, ", max_value_exact: {}", self.is_max_value_exact)?; + write!(f, ", min_value_exact: {}", self.is_min_value_exact)?; write!(f, "}}") } } @@ -563,13 +638,15 @@ impl fmt::Debug for ValueStatistics { write!( f, "{{min: {:?}, max: {:?}, distinct_count: {:?}, null_count: {}, \ - min_max_deprecated: {}, min_max_backwards_compatible: {}}}", + min_max_deprecated: {}, min_max_backwards_compatible: {}, max_value_exact: {}, min_value_exact: {}}}", self.min, self.max, self.distinct_count, self.null_count, self.is_min_max_deprecated, - self.is_min_max_backwards_compatible + self.is_min_max_backwards_compatible, + self.is_max_value_exact, + self.is_min_value_exact ) } } @@ -580,7 +657,7 @@ mod tests { #[test] fn test_statistics_min_max_bytes() { - let stats = Statistics::int32(Some(-123), Some(234), None, 1, false); + let stats = Statistics::int32(Some(-123), Some(234), None, 1, false, true, true); assert!(stats.has_min_max_set()); assert_eq!(stats.min_bytes(), (-123).as_bytes()); assert_eq!(stats.max_bytes(), 234.as_bytes()); @@ -591,6 +668,8 @@ mod tests { None, 1, true, + true, + true, ); assert!(stats.has_min_max_set()); assert_eq!(stats.min_bytes(), &[1, 2, 3]); @@ -607,6 +686,8 @@ mod tests { distinct_count: None, max_value: None, min_value: None, + is_min_value_exact: None, + is_max_value_exact: None, }; from_thrift(Type::INT32, Some(thrift_stats)).unwrap(); @@ -620,34 +701,34 @@ mod tests { #[test] fn test_statistics_debug() { - let stats = Statistics::int32(Some(1), Some(12), None, 12, true); + let stats = Statistics::int32(Some(1), Some(12), None, 12, true, true, true); assert_eq!( format!("{stats:?}"), "Int32({min: Some(1), max: Some(12), distinct_count: None, null_count: 12, \ - min_max_deprecated: true, min_max_backwards_compatible: true})" + min_max_deprecated: true, min_max_backwards_compatible: true, max_value_exact: true, min_value_exact: true})" ); - let stats = Statistics::int32(None, None, None, 7, false); + let stats = Statistics::int32(None, None, None, 7, false, false, false); assert_eq!( format!("{stats:?}"), "Int32({min: None, max: None, distinct_count: None, null_count: 7, \ - min_max_deprecated: false, min_max_backwards_compatible: false})" + min_max_deprecated: false, min_max_backwards_compatible: false, max_value_exact: false, min_value_exact: false})" ) } #[test] fn test_statistics_display() { - let stats = Statistics::int32(Some(1), Some(12), None, 12, true); + let stats = Statistics::int32(Some(1), Some(12), None, 12, true, true, true); assert_eq!( format!("{stats}"), - "{min: 1, max: 12, distinct_count: N/A, null_count: 12, min_max_deprecated: true}" + "{min: 1, max: 12, distinct_count: N/A, null_count: 12, min_max_deprecated: true, max_value_exact: true, min_value_exact: true}" ); - let stats = Statistics::int64(None, None, None, 7, false); + let stats = Statistics::int64(None, None, None, 7, false, false, false); assert_eq!( format!("{stats}"), "{min: N/A, max: N/A, distinct_count: N/A, null_count: 7, min_max_deprecated: \ - false}" + false, max_value_exact: false, min_value_exact: false}" ); let stats = Statistics::int96( @@ -656,11 +737,13 @@ mod tests { None, 3, true, + true, + true, ); assert_eq!( format!("{stats}"), "{min: [1, 0, 0], max: [2, 3, 4], distinct_count: N/A, null_count: 3, \ - min_max_deprecated: true}" + min_max_deprecated: true, max_value_exact: true, min_value_exact: true}" ); let stats = Statistics::byte_array( @@ -669,31 +752,35 @@ mod tests { Some(5), 7, false, + false, + false, ); assert_eq!( format!("{stats}"), - "{min: [1], max: [2], distinct_count: 5, null_count: 7, min_max_deprecated: false}" + "{min: [1], max: [2], distinct_count: 5, null_count: 7, min_max_deprecated: false, max_value_exact: false, min_value_exact: false}" ); } #[test] fn test_statistics_partial_eq() { - let expected = Statistics::int32(Some(12), Some(45), None, 11, true); + let expected = Statistics::int32(Some(12), Some(45), None, 11, true, true, true); - assert!(Statistics::int32(Some(12), Some(45), None, 11, true) == expected); - assert!(Statistics::int32(Some(11), Some(45), None, 11, true) != expected); - assert!(Statistics::int32(Some(12), Some(44), None, 11, true) != expected); - assert!(Statistics::int32(Some(12), Some(45), None, 23, true) != expected); - assert!(Statistics::int32(Some(12), Some(45), None, 11, false) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 11, true, true, true) == expected); + assert!(Statistics::int32(Some(11), Some(45), None, 11, true, true, true) != expected); + assert!(Statistics::int32(Some(12), Some(44), None, 11, true, true, true) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 23, true, true, true) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 11, false, true, true) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 11, true, false, true) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 11, true, true, false) != expected); assert!( - Statistics::int32(Some(12), Some(45), None, 11, false) - != Statistics::int64(Some(12), Some(45), None, 11, false) + Statistics::int32(Some(12), Some(45), None, 11, false, true, true) + != Statistics::int64(Some(12), Some(45), None, 11, false, true, true) ); assert!( - Statistics::boolean(Some(false), Some(true), None, 0, true) - != Statistics::double(Some(1.2), Some(4.5), None, 0, true) + Statistics::boolean(Some(false), Some(true), None, 0, true, true, true) + != Statistics::double(Some(1.2), Some(4.5), None, 0, true, true, true) ); assert!( @@ -702,12 +789,16 @@ mod tests { Some(ByteArray::from(vec![1, 2, 3])), None, 0, + true, + true, true ) != Statistics::fixed_len_byte_array( Some(ByteArray::from(vec![1, 2, 3]).into()), Some(ByteArray::from(vec![1, 2, 3]).into()), None, 0, + true, + true, true ) ); @@ -722,28 +813,132 @@ mod tests { assert_eq!(from_thrift(tpe, thrift_stats).unwrap(), Some(stats)); } - check_stats(Statistics::boolean(Some(false), Some(true), None, 7, true)); - check_stats(Statistics::boolean(Some(false), Some(true), None, 7, true)); - check_stats(Statistics::boolean(Some(false), Some(true), None, 0, false)); - check_stats(Statistics::boolean(Some(true), Some(true), None, 7, true)); - check_stats(Statistics::boolean(Some(false), Some(false), None, 7, true)); - check_stats(Statistics::boolean(None, None, None, 7, true)); + check_stats(Statistics::boolean( + Some(false), + Some(true), + None, + 7, + true, + true, + true, + )); + check_stats(Statistics::boolean( + Some(false), + Some(true), + None, + 7, + true, + true, + true, + )); + check_stats(Statistics::boolean( + Some(false), + Some(true), + None, + 0, + false, + true, + true, + )); + check_stats(Statistics::boolean( + Some(true), + Some(true), + None, + 7, + true, + true, + true, + )); + check_stats(Statistics::boolean( + Some(false), + Some(false), + None, + 7, + true, + true, + true, + )); + check_stats(Statistics::boolean(None, None, None, 7, true, false, false)); - check_stats(Statistics::int32(Some(-100), Some(500), None, 7, true)); - check_stats(Statistics::int32(Some(-100), Some(500), None, 0, false)); - check_stats(Statistics::int32(None, None, None, 7, true)); + check_stats(Statistics::int32( + Some(-100), + Some(500), + None, + 7, + true, + true, + true, + )); + check_stats(Statistics::int32( + Some(-100), + Some(500), + None, + 0, + false, + true, + true, + )); + check_stats(Statistics::int32(None, None, None, 7, true, false, false)); - check_stats(Statistics::int64(Some(-100), Some(200), None, 7, true)); - check_stats(Statistics::int64(Some(-100), Some(200), None, 0, false)); - check_stats(Statistics::int64(None, None, None, 7, true)); + check_stats(Statistics::int64( + Some(-100), + Some(200), + None, + 7, + true, + true, + true, + )); + check_stats(Statistics::int64( + Some(-100), + Some(200), + None, + 0, + false, + true, + true, + )); + check_stats(Statistics::int64(None, None, None, 7, true, false, false)); - check_stats(Statistics::float(Some(1.2), Some(3.4), None, 7, true)); - check_stats(Statistics::float(Some(1.2), Some(3.4), None, 0, false)); - check_stats(Statistics::float(None, None, None, 7, true)); + check_stats(Statistics::float( + Some(1.2), + Some(3.4), + None, + 7, + true, + true, + true, + )); + check_stats(Statistics::float( + Some(1.2), + Some(3.4), + None, + 0, + false, + true, + true, + )); + check_stats(Statistics::float(None, None, None, 7, true, false, false)); - check_stats(Statistics::double(Some(1.2), Some(3.4), None, 7, true)); - check_stats(Statistics::double(Some(1.2), Some(3.4), None, 0, false)); - check_stats(Statistics::double(None, None, None, 7, true)); + check_stats(Statistics::double( + Some(1.2), + Some(3.4), + None, + 7, + true, + true, + true, + )); + check_stats(Statistics::double( + Some(1.2), + Some(3.4), + None, + 0, + false, + true, + true, + )); + check_stats(Statistics::double(None, None, None, 7, true, false, false)); check_stats(Statistics::byte_array( Some(ByteArray::from(vec![1, 2, 3])), @@ -751,8 +946,12 @@ mod tests { None, 7, true, + true, + true, + )); + check_stats(Statistics::byte_array( + None, None, None, 7, true, false, false, )); - check_stats(Statistics::byte_array(None, None, None, 7, true)); check_stats(Statistics::fixed_len_byte_array( Some(ByteArray::from(vec![1, 2, 3]).into()), @@ -760,7 +959,11 @@ mod tests { None, 7, true, + true, + true, + )); + check_stats(Statistics::fixed_len_byte_array( + None, None, None, 7, true, false, false, )); - check_stats(Statistics::fixed_len_byte_array(None, None, None, 7, true)); } } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 2b9f261d9f42..7c2795f1f38f 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1044,7 +1044,15 @@ mod tests { encoding: Encoding::DELTA_BINARY_PACKED, def_level_encoding: Encoding::RLE, rep_level_encoding: Encoding::RLE, - statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)), + statistics: Some(Statistics::int32( + Some(1), + Some(3), + None, + 7, + true, + true, + true, + )), }, Page::DataPageV2 { buf: Bytes::from(vec![4; 128]), @@ -1055,7 +1063,15 @@ mod tests { def_levels_byte_len: 24, rep_levels_byte_len: 32, is_compressed: false, - statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)), + statistics: Some(Statistics::int32( + Some(1), + Some(3), + None, + 7, + true, + true, + true, + )), }, ]; @@ -1078,7 +1094,15 @@ mod tests { encoding: Encoding::DELTA_BINARY_PACKED, def_level_encoding: Encoding::RLE, rep_level_encoding: Encoding::RLE, - statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)), + statistics: Some(Statistics::int32( + Some(1), + Some(3), + None, + 7, + true, + true, + true, + )), }, Page::DataPageV2 { buf: Bytes::from(vec![4; 128]), diff --git a/parquet/src/format.rs b/parquet/src/format.rs index 46adc39e6406..e9bbea12d94f 100644 --- a/parquet/src/format.rs +++ b/parquet/src/format.rs @@ -663,10 +663,13 @@ pub struct Statistics { /// arrays do not include a length prefix. pub max_value: Option>, pub min_value: Option>, + + pub is_max_value_exact: Option, + pub is_min_value_exact: Option, } impl Statistics { - pub fn new(max: F1, min: F2, null_count: F3, distinct_count: F4, max_value: F5, min_value: F6) -> Statistics where F1: Into>>, F2: Into>>, F3: Into>, F4: Into>, F5: Into>>, F6: Into>> { + pub fn new(max: F1, min: F2, null_count: F3, distinct_count: F4, max_value: F5, min_value: F6, is_max_value_exact : F7, is_min_value_exact: F8) -> Statistics where F1: Into>>, F2: Into>>, F3: Into>, F4: Into>, F5: Into>>, F6: Into>>, F7: Into>, F8: Into> { Statistics { max: max.into(), min: min.into(), @@ -674,6 +677,8 @@ impl Statistics { distinct_count: distinct_count.into(), max_value: max_value.into(), min_value: min_value.into(), + is_max_value_exact : is_max_value_exact.into(), + is_min_value_exact : is_min_value_exact.into(), } } } @@ -687,6 +692,8 @@ impl crate::thrift::TSerializable for Statistics { let mut f_4: Option = None; let mut f_5: Option> = None; let mut f_6: Option> = None; + let mut f_7: Option = None; + let mut f_8: Option = None; loop { let field_ident = i_prot.read_field_begin()?; if field_ident.field_type == TType::Stop { @@ -718,6 +725,14 @@ impl crate::thrift::TSerializable for Statistics { let val = i_prot.read_bytes()?; f_6 = Some(val); }, + 7 => { + let val = i_prot.read_bool()?; + f_7 = Some(val); + } + 8 => { + let val = i_prot.read_bool()?; + f_8 = Some(val) + } _ => { i_prot.skip(field_ident.field_type)?; }, @@ -732,6 +747,8 @@ impl crate::thrift::TSerializable for Statistics { distinct_count: f_4, max_value: f_5, min_value: f_6, + is_max_value_exact: f_7, + is_min_value_exact: f_8 }; Ok(ret) } @@ -768,6 +785,16 @@ impl crate::thrift::TSerializable for Statistics { o_prot.write_bytes(fld_var)?; o_prot.write_field_end()? } + if let Some(fld_var) = self.is_max_value_exact { + o_prot.write_field_begin(&TFieldIdentifier::new("is_max_value_exact", TType::Bool, 7))?; + o_prot.write_bool(fld_var)?; + o_prot.write_field_end()? + } + if let Some(fld_var) = self.is_min_value_exact { + o_prot.write_field_begin(&TFieldIdentifier::new("is_min_value_exact", TType::Bool, 8))?; + o_prot.write_bool(fld_var)?; + o_prot.write_field_end()? + } o_prot.write_field_stop()?; o_prot.write_struct_end() } diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index fab87f32f5c4..cd124031cfdc 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -185,7 +185,7 @@ fn test_primitive() { pages: (0..8) .map(|_| Page { rows: 250, - page_header_size: 34, + page_header_size: 36, compressed_size: 1000, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -214,14 +214,14 @@ fn test_primitive() { pages: vec![ Page { rows: 250, - page_header_size: 34, + page_header_size: 36, compressed_size: 258, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 1750, - page_header_size: 34, + page_header_size: 36, compressed_size: 7000, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -229,7 +229,7 @@ fn test_primitive() { ], dictionary_page: Some(Page { rows: 250, - page_header_size: 34, + page_header_size: 36, compressed_size: 1000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -256,42 +256,42 @@ fn test_primitive() { pages: vec![ Page { rows: 400, - page_header_size: 34, + page_header_size: 36, compressed_size: 452, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 370, - page_header_size: 34, + page_header_size: 36, compressed_size: 472, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 34, + page_header_size: 36, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 34, + page_header_size: 36, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 34, + page_header_size: 36, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 240, - page_header_size: 34, + page_header_size: 36, compressed_size: 332, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, @@ -299,7 +299,7 @@ fn test_primitive() { ], dictionary_page: Some(Page { rows: 2000, - page_header_size: 34, + page_header_size: 36, compressed_size: 8000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -325,7 +325,7 @@ fn test_primitive() { pages: (0..20) .map(|_| Page { rows: 100, - page_header_size: 34, + page_header_size: 36, compressed_size: 400, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -360,14 +360,14 @@ fn test_string() { pages: (0..15) .map(|_| Page { rows: 130, - page_header_size: 34, + page_header_size: 36, compressed_size: 1040, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, }) .chain(std::iter::once(Page { rows: 50, - page_header_size: 33, + page_header_size: 35, compressed_size: 400, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -396,21 +396,21 @@ fn test_string() { pages: vec![ Page { rows: 130, - page_header_size: 34, + page_header_size: 36, compressed_size: 138, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 1250, - page_header_size: 36, + page_header_size: 38, compressed_size: 10000, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, }, Page { rows: 620, - page_header_size: 34, + page_header_size: 36, compressed_size: 4960, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -418,7 +418,7 @@ fn test_string() { ], dictionary_page: Some(Page { rows: 130, - page_header_size: 34, + page_header_size: 36, compressed_size: 1040, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -445,42 +445,42 @@ fn test_string() { pages: vec![ Page { rows: 400, - page_header_size: 34, + page_header_size: 36, compressed_size: 452, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 370, - page_header_size: 34, + page_header_size: 36, compressed_size: 472, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 34, + page_header_size: 36, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 34, + page_header_size: 36, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 34, + page_header_size: 36, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 240, - page_header_size: 34, + page_header_size: 36, compressed_size: 332, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, @@ -488,7 +488,7 @@ fn test_string() { ], dictionary_page: Some(Page { rows: 2000, - page_header_size: 34, + page_header_size: 36, compressed_size: 16000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -528,7 +528,7 @@ fn test_list() { pages: (0..10) .map(|_| Page { rows: 20, - page_header_size: 34, + page_header_size: 36, compressed_size: 672, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, From e57634dd49260ced6ea0d2668e56e1e6d7dcf165 Mon Sep 17 00:00:00 2001 From: Matthew Kemp Date: Tue, 14 Nov 2023 13:06:34 +0000 Subject: [PATCH 2/6] implement truncation property and logic, tests --- parquet/src/column/writer/mod.rs | 224 ++++++++++++++++++++++++++++--- parquet/src/file/properties.rs | 24 ++++ parquet/src/file/statistics.rs | 4 +- 3 files changed, 233 insertions(+), 19 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index b190ffa21daf..f193ef104486 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -635,8 +635,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { self.column_index_builder.append( null_page, - self.truncate_min_value(stat.min_bytes()), - self.truncate_max_value(stat.max_bytes()), + self.truncate_min_value( + self.props.column_index_truncate_length(), + stat.min_bytes(), + ) + .0, + self.truncate_max_value( + self.props.column_index_truncate_length(), + stat.max_bytes(), + ) + .0, self.page_metrics.num_page_nulls as i64, ); } @@ -657,26 +665,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .append_row_count(self.page_metrics.num_buffered_rows as i64); } - fn truncate_min_value(&self, data: &[u8]) -> Vec { - self.props - .column_index_truncate_length() + fn truncate_min_value(&self, truncation_length: Option, data: &[u8]) -> (Vec, bool) { + truncation_length .filter(|l| data.len() > *l) .and_then(|l| match str::from_utf8(data) { Ok(str_data) => truncate_utf8(str_data, l), Err(_) => Some(data[..l].to_vec()), }) - .unwrap_or_else(|| data.to_vec()) + .map(|truncated| (truncated, true)) + .unwrap_or_else(|| (data.to_vec(), false)) } - fn truncate_max_value(&self, data: &[u8]) -> Vec { - self.props - .column_index_truncate_length() + fn truncate_max_value(&self, truncation_length: Option, data: &[u8]) -> (Vec, bool) { + truncation_length .filter(|l| data.len() > *l) .and_then(|l| match str::from_utf8(data) { Ok(str_data) => truncate_utf8(str_data, l).and_then(increment_utf8), Err(_) => increment(data[..l].to_vec()), }) - .unwrap_or_else(|| data.to_vec()) + .map(|truncated| (truncated, true)) + .unwrap_or_else(|| (data.to_vec(), false)) } /// Adds data page. @@ -857,6 +865,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .set_dictionary_page_offset(dict_page_offset); if self.statistics_enabled != EnabledStatistics::None { + let backwards_compatible_min_max = self.descr.sort_order().is_signed(); + let statistics = ValueStatistics::::new( self.column_metrics.min_column_value.clone(), self.column_metrics.max_column_value.clone(), @@ -865,14 +875,52 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { false, true, true, - ); + ) + .with_backwards_compatible_min_max(backwards_compatible_min_max) + .into(); + + let statistics = match statistics { + Statistics::ByteArray(stats) if stats.has_min_max_set() => { + let (min, did_truncate_min) = self.truncate_min_value( + self.props.statistics_truncate_length(), + stats.min_bytes().clone(), + ); + let (max, did_truncate_max) = self.truncate_max_value( + self.props.statistics_truncate_length(), + stats.max_bytes(), + ); + Statistics::byte_array( + Some(min.into()), + Some(max.into()), + stats.distinct_count(), + stats.null_count(), + backwards_compatible_min_max, + !did_truncate_max, + !did_truncate_min, + ) + } + Statistics::FixedLenByteArray(stats) if stats.has_min_max_set() => { + let (min, did_truncate_min) = self.truncate_min_value( + self.props.statistics_truncate_length(), + stats.min_bytes(), + ); + let (max, did_truncate_max) = self.truncate_max_value( + self.props.statistics_truncate_length(), + stats.max_bytes(), + ); + Statistics::fixed_len_byte_array( + Some(min.into()), + Some(max.into()), + stats.distinct_count(), + stats.null_count(), + backwards_compatible_min_max, + !did_truncate_max, + !did_truncate_min, + ) + } + stats => stats, + }; - // Some common readers only support the deprecated statistics - // format so we also write them out if possible - // See https://github.com/apache/arrow-rs/issues/799 - let statistics = statistics - .with_backwards_compatible_min_max(self.descr.sort_order().is_signed()) - .into(); builder = builder.set_statistics(statistics); } @@ -2468,6 +2516,148 @@ mod tests { } } + #[test] + fn test_statistics_truncating_byte_array() { + let page_writer = get_test_page_writer(); + + const TEST_TRUNCATE_LENGTH: usize = 1; + + // Truncate values at 1 byte + let builder = + WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH)); + let props = Arc::new(builder.build()); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + + let mut data = vec![ByteArray::default(); 1]; + // This is the expected min value + data[0].set_data(Bytes::from(String::from("Blart Versenwald III"))); + + writer.write_batch(&data, None, None).unwrap(); + + writer.flush_data_pages().unwrap(); + + let r = writer.close().unwrap(); + + assert_eq!(1, r.rows_written); + + let stats = r.metadata.statistics().expect("statistics"); + assert!(stats.has_min_max_set()); + assert_eq!(stats.null_count(), 0); + assert_eq!(stats.distinct_count(), None); + if let Statistics::ByteArray(_stats) = stats { + let min_value = _stats.min(); + let max_value = _stats.max(); + + assert!(!_stats.min_is_exact()); + assert!(!_stats.max_is_exact()); + + assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH); + assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH); + + assert_eq!("B".as_bytes(), min_value.as_bytes()); + assert_eq!("C".as_bytes(), max_value.as_bytes()); + } else { + panic!("expecting Statistics::ByteArray"); + } + } + + #[test] + fn test_statistics_truncating_fixed_len_byte_array() { + let page_writer = get_test_page_writer(); + + const TEST_TRUNCATE_LENGTH: usize = 1; + + // Truncate values at 1 byte + let builder = + WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH)); + let props = Arc::new(builder.build()); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + + let mut data = vec![FixedLenByteArray::default(); 1]; + + const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654; + const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes(); + + const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals + const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] = + [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0]; + + // This is the expected min value + data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice())); + + writer.write_batch(&data, None, None).unwrap(); + + writer.flush_data_pages().unwrap(); + + let r = writer.close().unwrap(); + + assert_eq!(1, r.rows_written); + + let stats = r.metadata.statistics().expect("statistics"); + assert!(stats.has_min_max_set()); + assert_eq!(stats.null_count(), 0); + assert_eq!(stats.distinct_count(), None); + if let Statistics::FixedLenByteArray(_stats) = stats { + let min_value = _stats.min(); + let max_value = _stats.max(); + + assert!(!_stats.min_is_exact()); + assert!(!_stats.max_is_exact()); + + assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH); + assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH); + + assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes()); + assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes()); + + let reconstructed_min = i128::from_be_bytes([ + min_value.as_bytes()[0], + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ]); + + let reconstructed_max = i128::from_be_bytes([ + max_value.as_bytes()[0], + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ]); + + // check that the inner value is correctly bounded by the min/max + println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}"); + assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE); + println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}"); + assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE); + } else { + panic!("expecting Statistics::FixedLenByteArray"); + } + } + #[test] fn test_send() { fn test() {} diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index ea71763a0101..287e73c9906a 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -51,6 +51,8 @@ pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option = Some(64); pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05; /// Default value for [`BloomFilterProperties::ndv`] pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64; +/// Default values for [`WriterProperties::statistics_truncate_length`] +pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option = None; /// Parquet writer version. /// @@ -136,6 +138,7 @@ pub struct WriterProperties { column_properties: HashMap, sorting_columns: Option>, column_index_truncate_length: Option, + statistics_truncate_length: Option, } impl Default for WriterProperties { @@ -241,6 +244,13 @@ impl WriterProperties { self.column_index_truncate_length } + /// Returns the maximum length of truncated min/max values in statistics. + /// + /// `None` if truncation is disabled, must be greater than 0 otherwise. + pub fn statistics_truncate_length(&self) -> Option { + self.statistics_truncate_length + } + /// Returns encoding for a data page, when dictionary encoding is enabled. /// This is not configurable. #[inline] @@ -334,6 +344,7 @@ pub struct WriterPropertiesBuilder { column_properties: HashMap, sorting_columns: Option>, column_index_truncate_length: Option, + statistics_truncate_length: Option, } impl WriterPropertiesBuilder { @@ -352,6 +363,7 @@ impl WriterPropertiesBuilder { column_properties: HashMap::new(), sorting_columns: None, column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, + statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH, } } @@ -370,6 +382,7 @@ impl WriterPropertiesBuilder { column_properties: self.column_properties, sorting_columns: self.sorting_columns, column_index_truncate_length: self.column_index_truncate_length, + statistics_truncate_length: self.statistics_truncate_length, } } @@ -643,6 +656,17 @@ impl WriterPropertiesBuilder { self.column_index_truncate_length = max_length; self } + + /// Sets the max length of min/max value fields in statistics. Must be greater than 0. + /// If set to `None` - there's no effective limit. + pub fn set_statistics_truncate_length(mut self, max_length: Option) -> Self { + if let Some(value) = max_length { + assert!(value > 0, "Cannot have a 0 statistics truncate length. If you wish to disable min/max value truncation, set it to `None`."); + } + + self.statistics_truncate_length = max_length; + self + } } /// Controls the level of statistics to be computed by the writer diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index 26729afc03db..9e914009bcd7 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -578,12 +578,12 @@ impl ValueStatistics { } /// Returns optional value of number of distinct values occurring. - fn distinct_count(&self) -> Option { + pub fn distinct_count(&self) -> Option { self.distinct_count } /// Returns null count. - fn null_count(&self) -> u64 { + pub fn null_count(&self) -> u64 { self.null_count } From 6b39298f116744f79b3578a57c27521d55404912 Mon Sep 17 00:00:00 2001 From: emcake <3726783+emcake@users.noreply.github.com> Date: Tue, 14 Nov 2023 15:38:00 +0000 Subject: [PATCH 3/6] format lints --- parquet/src/file/statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index dd377eed5c18..3d47a4617006 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -645,7 +645,7 @@ impl fmt::Debug for ValueStatistics { self.null_count, self.is_min_max_deprecated, self.is_min_max_backwards_compatible, - self.is_max_value_exact, + self.is_max_value_exact, self.is_min_value_exact ) } From 96ceb1235e2755e77720478170e3c0e371f2ce07 Mon Sep 17 00:00:00 2001 From: emcake <3726783+emcake@users.noreply.github.com> Date: Tue, 14 Nov 2023 17:08:31 +0000 Subject: [PATCH 4/6] change min/max exact to be with... methods --- parquet/src/column/page.rs | 50 +---- parquet/src/column/writer/mod.rs | 40 ++-- parquet/src/file/statistics.rs | 344 ++++++++++++------------------- parquet/src/file/writer.rs | 30 +-- 4 files changed, 155 insertions(+), 309 deletions(-) diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 43284eccd1e5..4973f070bb00 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -376,15 +376,7 @@ mod tests { encoding: Encoding::PLAIN, def_level_encoding: Encoding::RLE, rep_level_encoding: Encoding::RLE, - statistics: Some(Statistics::int32( - Some(1), - Some(2), - None, - 1, - true, - true, - true, - )), + statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)), }; assert_eq!(data_page.page_type(), PageType::DATA_PAGE); assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice()); @@ -392,15 +384,7 @@ mod tests { assert_eq!(data_page.encoding(), Encoding::PLAIN); assert_eq!( data_page.statistics(), - Some(&Statistics::int32( - Some(1), - Some(2), - None, - 1, - true, - true, - true - )) + Some(&Statistics::int32(Some(1), Some(2), None, 1, true,)) ); let data_page_v2 = Page::DataPageV2 { @@ -412,15 +396,7 @@ mod tests { def_levels_byte_len: 30, rep_levels_byte_len: 40, is_compressed: false, - statistics: Some(Statistics::int32( - Some(1), - Some(2), - None, - 1, - true, - true, - true, - )), + statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)), }; assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2); assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice()); @@ -428,15 +404,7 @@ mod tests { assert_eq!(data_page_v2.encoding(), Encoding::PLAIN); assert_eq!( data_page_v2.statistics(), - Some(&Statistics::int32( - Some(1), - Some(2), - None, - 1, - true, - true, - true - )) + Some(&Statistics::int32(Some(1), Some(2), None, 1, true,)) ); let dict_page = Page::DictionaryPage { @@ -460,15 +428,7 @@ mod tests { encoding: Encoding::PLAIN, def_level_encoding: Encoding::RLE, rep_level_encoding: Encoding::RLE, - statistics: Some(Statistics::int32( - Some(1), - Some(2), - None, - 1, - true, - true, - true, - )), + statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)), }; let cpage = CompressedPage::new(data_page, 5); diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index f15bd5c8e7c9..6b1394337508 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -709,8 +709,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { None, self.page_metrics.num_page_nulls, false, - true, - true, )) } _ => None, @@ -874,8 +872,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.column_metrics.column_distinct_count, self.column_metrics.num_column_nulls, false, - true, - true, ) .with_backwards_compatible_min_max(backwards_compatible_min_max) .into(); @@ -890,14 +886,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.props.statistics_truncate_length(), stats.max_bytes(), ); - Statistics::byte_array( - Some(min.into()), - Some(max.into()), - stats.distinct_count(), - stats.null_count(), - backwards_compatible_min_max, - !did_truncate_max, - !did_truncate_min, + Statistics::ByteArray( + ValueStatistics::new( + Some(min.into()), + Some(max.into()), + stats.distinct_count(), + stats.null_count(), + backwards_compatible_min_max, + ) + .with_max_is_exact(!did_truncate_max) + .with_min_is_exact(!did_truncate_min), ) } Statistics::FixedLenByteArray(stats) if stats.has_min_max_set() => { @@ -909,14 +907,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.props.statistics_truncate_length(), stats.max_bytes(), ); - Statistics::fixed_len_byte_array( - Some(min.into()), - Some(max.into()), - stats.distinct_count(), - stats.null_count(), - backwards_compatible_min_max, - !did_truncate_max, - !did_truncate_min, + Statistics::FixedLenByteArray( + ValueStatistics::new( + Some(min.into()), + Some(max.into()), + stats.distinct_count(), + stats.null_count(), + backwards_compatible_min_max, + ) + .with_max_is_exact(!did_truncate_max) + .with_min_is_exact(!did_truncate_min), ) } stats => stats, diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index 3d47a4617006..96ee14ecc088 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -23,7 +23,7 @@ //! ```rust //! use parquet::file::statistics::Statistics; //! -//! let stats = Statistics::int32(Some(1), Some(10), None, 3, true, true, true); +//! let stats = Statistics::int32(Some(1), Some(10), None, 3, true); //! assert_eq!(stats.null_count(), 3); //! assert!(stats.has_min_max_set()); //! assert!(stats.is_min_max_deprecated()); @@ -90,8 +90,6 @@ macro_rules! statistics_new_func { distinct: Option, nulls: u64, is_deprecated: bool, - is_max_value_exact: bool, - is_min_value_exact: bool, ) -> Self { Statistics::$stat(ValueStatistics::new( min, @@ -99,8 +97,6 @@ macro_rules! statistics_new_func { distinct, nulls, is_deprecated, - is_max_value_exact, - is_min_value_exact, )) } }; @@ -158,12 +154,6 @@ pub fn from_thrift( stats.max_value }; - // Whether or not the min/max values are exact. Due to pre-existing truncation - // in other libraries such as parquet-mr, we can't assume that any given parquet file - // has exact statistics unless it's explicitly set. - let is_max_value_exact = stats.is_max_value_exact.unwrap_or(false); - let is_min_value_exact = stats.is_min_value_exact.unwrap_or(false); - // Values are encoded using PLAIN encoding definition, except that // variable-length byte arrays do not include a length prefix. // @@ -175,8 +165,6 @@ pub fn from_thrift( distinct_count, null_count, old_format, - is_max_value_exact, - is_min_value_exact, ), Type::INT32 => Statistics::int32( min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())), @@ -184,8 +172,6 @@ pub fn from_thrift( distinct_count, null_count, old_format, - is_max_value_exact, - is_min_value_exact, ), Type::INT64 => Statistics::int64( min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())), @@ -193,8 +179,6 @@ pub fn from_thrift( distinct_count, null_count, old_format, - is_max_value_exact, - is_min_value_exact, ), Type::INT96 => { // INT96 statistics may not be correct, because comparison is signed @@ -208,15 +192,7 @@ pub fn from_thrift( assert_eq!(data.len(), 12); from_le_slice::(&data) }); - Statistics::int96( - min, - max, - distinct_count, - null_count, - old_format, - is_max_value_exact, - is_min_value_exact, - ) + Statistics::int96(min, max, distinct_count, null_count, old_format) } Type::FLOAT => Statistics::float( min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), @@ -224,8 +200,6 @@ pub fn from_thrift( distinct_count, null_count, old_format, - is_max_value_exact, - is_min_value_exact, ), Type::DOUBLE => Statistics::double( min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), @@ -233,26 +207,28 @@ pub fn from_thrift( distinct_count, null_count, old_format, - is_max_value_exact, - is_min_value_exact, ), - Type::BYTE_ARRAY => Statistics::byte_array( - min.map(ByteArray::from), - max.map(ByteArray::from), - distinct_count, - null_count, - old_format, - is_max_value_exact, - is_min_value_exact, + Type::BYTE_ARRAY => Statistics::ByteArray( + ValueStatistics::new( + min.map(ByteArray::from), + max.map(ByteArray::from), + distinct_count, + null_count, + old_format, + ) + .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false)) + .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)), ), - Type::FIXED_LEN_BYTE_ARRAY => Statistics::fixed_len_byte_array( - min.map(ByteArray::from).map(FixedLenByteArray::from), - max.map(ByteArray::from).map(FixedLenByteArray::from), - distinct_count, - null_count, - old_format, - is_max_value_exact, - is_min_value_exact, + Type::FIXED_LEN_BYTE_ARRAY => Statistics::FixedLenByteArray( + ValueStatistics::new( + min.map(ByteArray::from).map(FixedLenByteArray::from), + max.map(ByteArray::from).map(FixedLenByteArray::from), + distinct_count, + null_count, + old_format, + ) + .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false)) + .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)), ), }; @@ -336,8 +312,6 @@ impl Statistics { distinct_count: Option, null_count: u64, is_deprecated: bool, - is_max_value_exact: bool, - is_min_value_exact: bool, ) -> Self { Self::from(ValueStatistics::new( min, @@ -345,8 +319,6 @@ impl Statistics { distinct_count, null_count, is_deprecated, - is_min_value_exact, - is_max_value_exact, )) } @@ -502,18 +474,38 @@ impl ValueStatistics { distinct_count: Option, null_count: u64, is_min_max_deprecated: bool, - is_max_value_exact: bool, - is_min_value_exact: bool, ) -> Self { Self { + is_max_value_exact: max.is_some(), + is_min_value_exact: min.is_some(), min, max, distinct_count, null_count, is_min_max_deprecated, is_min_max_backwards_compatible: is_min_max_deprecated, - is_max_value_exact, + } + } + + /// Set whether the stored `min` field represents the exact + /// minimum, or just a bound on the minimum value. + /// + /// see [`Self::min_is_exact`] + pub fn with_min_is_exact(self, is_min_value_exact: bool) -> Self { + Self { is_min_value_exact, + ..self + } + } + + /// Set whether the stored `max` field represents the exact + /// maximum, or just a bound on the maximum value. + /// + /// see [`Self::max_is_exact`] + pub fn with_max_is_exact(self, is_max_value_exact: bool) -> Self { + Self { + is_max_value_exact, + ..self } } @@ -657,7 +649,7 @@ mod tests { #[test] fn test_statistics_min_max_bytes() { - let stats = Statistics::int32(Some(-123), Some(234), None, 1, false, true, true); + let stats = Statistics::int32(Some(-123), Some(234), None, 1, false); assert!(stats.has_min_max_set()); assert_eq!(stats.min_bytes(), (-123).as_bytes()); assert_eq!(stats.max_bytes(), 234.as_bytes()); @@ -668,8 +660,6 @@ mod tests { None, 1, true, - true, - true, ); assert!(stats.has_min_max_set()); assert_eq!(stats.min_bytes(), &[1, 2, 3]); @@ -701,14 +691,14 @@ mod tests { #[test] fn test_statistics_debug() { - let stats = Statistics::int32(Some(1), Some(12), None, 12, true, true, true); + let stats = Statistics::int32(Some(1), Some(12), None, 12, true); assert_eq!( format!("{stats:?}"), "Int32({min: Some(1), max: Some(12), distinct_count: None, null_count: 12, \ min_max_deprecated: true, min_max_backwards_compatible: true, max_value_exact: true, min_value_exact: true})" ); - let stats = Statistics::int32(None, None, None, 7, false, false, false); + let stats = Statistics::int32(None, None, None, 7, false); assert_eq!( format!("{stats:?}"), "Int32({min: None, max: None, distinct_count: None, null_count: 7, \ @@ -718,13 +708,13 @@ mod tests { #[test] fn test_statistics_display() { - let stats = Statistics::int32(Some(1), Some(12), None, 12, true, true, true); + let stats = Statistics::int32(Some(1), Some(12), None, 12, true); assert_eq!( format!("{stats}"), "{min: 1, max: 12, distinct_count: N/A, null_count: 12, min_max_deprecated: true, max_value_exact: true, min_value_exact: true}" ); - let stats = Statistics::int64(None, None, None, 7, false, false, false); + let stats = Statistics::int64(None, None, None, 7, false); assert_eq!( format!("{stats}"), "{min: N/A, max: N/A, distinct_count: N/A, null_count: 7, min_max_deprecated: \ @@ -737,8 +727,6 @@ mod tests { None, 3, true, - true, - true, ); assert_eq!( format!("{stats}"), @@ -746,14 +734,16 @@ mod tests { min_max_deprecated: true, max_value_exact: true, min_value_exact: true}" ); - let stats = Statistics::byte_array( - Some(ByteArray::from(vec![1u8])), - Some(ByteArray::from(vec![2u8])), - Some(5), - 7, - false, - false, - false, + let stats = Statistics::ByteArray( + ValueStatistics::new( + Some(ByteArray::from(vec![1u8])), + Some(ByteArray::from(vec![2u8])), + Some(5), + 7, + false, + ) + .with_max_is_exact(false) + .with_min_is_exact(false), ); assert_eq!( format!("{stats}"), @@ -763,24 +753,22 @@ mod tests { #[test] fn test_statistics_partial_eq() { - let expected = Statistics::int32(Some(12), Some(45), None, 11, true, true, true); + let expected = Statistics::int32(Some(12), Some(45), None, 11, true); - assert!(Statistics::int32(Some(12), Some(45), None, 11, true, true, true) == expected); - assert!(Statistics::int32(Some(11), Some(45), None, 11, true, true, true) != expected); - assert!(Statistics::int32(Some(12), Some(44), None, 11, true, true, true) != expected); - assert!(Statistics::int32(Some(12), Some(45), None, 23, true, true, true) != expected); - assert!(Statistics::int32(Some(12), Some(45), None, 11, false, true, true) != expected); - assert!(Statistics::int32(Some(12), Some(45), None, 11, true, false, true) != expected); - assert!(Statistics::int32(Some(12), Some(45), None, 11, true, true, false) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 11, true,) == expected); + assert!(Statistics::int32(Some(11), Some(45), None, 11, true,) != expected); + assert!(Statistics::int32(Some(12), Some(44), None, 11, true,) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 23, true,) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 11, false,) != expected); assert!( - Statistics::int32(Some(12), Some(45), None, 11, false, true, true) - != Statistics::int64(Some(12), Some(45), None, 11, false, true, true) + Statistics::int32(Some(12), Some(45), None, 11, false) + != Statistics::int64(Some(12), Some(45), None, 11, false) ); assert!( - Statistics::boolean(Some(false), Some(true), None, 0, true, true, true) - != Statistics::double(Some(1.2), Some(4.5), None, 0, true, true, true) + Statistics::boolean(Some(false), Some(true), None, 0, true) + != Statistics::double(Some(1.2), Some(4.5), None, 0, true) ); assert!( @@ -790,16 +778,50 @@ mod tests { None, 0, true, - true, - true ) != Statistics::fixed_len_byte_array( Some(ByteArray::from(vec![1, 2, 3]).into()), Some(ByteArray::from(vec![1, 2, 3]).into()), None, 0, true, + ) + ); + + assert!( + Statistics::byte_array( + Some(ByteArray::from(vec![1, 2, 3])), + Some(ByteArray::from(vec![1, 2, 3])), + None, + 0, + true, + ) != Statistics::ByteArray( + ValueStatistics::new( + Some(ByteArray::from(vec![1, 2, 3])), + Some(ByteArray::from(vec![1, 2, 3])), + None, + 0, + true, + ) + .with_max_is_exact(false) + ) + ); + + assert!( + Statistics::fixed_len_byte_array( + Some(FixedLenByteArray::from(vec![1, 2, 3])), + Some(FixedLenByteArray::from(vec![1, 2, 3])), + None, + 0, true, - true + ) != Statistics::FixedLenByteArray( + ValueStatistics::new( + Some(FixedLenByteArray::from(vec![1, 2, 3])), + Some(FixedLenByteArray::from(vec![1, 2, 3])), + None, + 0, + true, + ) + .with_min_is_exact(false) ) ); } @@ -813,132 +835,28 @@ mod tests { assert_eq!(from_thrift(tpe, thrift_stats).unwrap(), Some(stats)); } - check_stats(Statistics::boolean( - Some(false), - Some(true), - None, - 7, - true, - true, - true, - )); - check_stats(Statistics::boolean( - Some(false), - Some(true), - None, - 7, - true, - true, - true, - )); - check_stats(Statistics::boolean( - Some(false), - Some(true), - None, - 0, - false, - true, - true, - )); - check_stats(Statistics::boolean( - Some(true), - Some(true), - None, - 7, - true, - true, - true, - )); - check_stats(Statistics::boolean( - Some(false), - Some(false), - None, - 7, - true, - true, - true, - )); - check_stats(Statistics::boolean(None, None, None, 7, true, false, false)); + check_stats(Statistics::boolean(Some(false), Some(true), None, 7, true)); + check_stats(Statistics::boolean(Some(false), Some(true), None, 7, true)); + check_stats(Statistics::boolean(Some(false), Some(true), None, 0, false)); + check_stats(Statistics::boolean(Some(true), Some(true), None, 7, true)); + check_stats(Statistics::boolean(Some(false), Some(false), None, 7, true)); + check_stats(Statistics::boolean(None, None, None, 7, true)); - check_stats(Statistics::int32( - Some(-100), - Some(500), - None, - 7, - true, - true, - true, - )); - check_stats(Statistics::int32( - Some(-100), - Some(500), - None, - 0, - false, - true, - true, - )); - check_stats(Statistics::int32(None, None, None, 7, true, false, false)); + check_stats(Statistics::int32(Some(-100), Some(500), None, 7, true)); + check_stats(Statistics::int32(Some(-100), Some(500), None, 0, false)); + check_stats(Statistics::int32(None, None, None, 7, true)); - check_stats(Statistics::int64( - Some(-100), - Some(200), - None, - 7, - true, - true, - true, - )); - check_stats(Statistics::int64( - Some(-100), - Some(200), - None, - 0, - false, - true, - true, - )); - check_stats(Statistics::int64(None, None, None, 7, true, false, false)); + check_stats(Statistics::int64(Some(-100), Some(200), None, 7, true)); + check_stats(Statistics::int64(Some(-100), Some(200), None, 0, false)); + check_stats(Statistics::int64(None, None, None, 7, true)); - check_stats(Statistics::float( - Some(1.2), - Some(3.4), - None, - 7, - true, - true, - true, - )); - check_stats(Statistics::float( - Some(1.2), - Some(3.4), - None, - 0, - false, - true, - true, - )); - check_stats(Statistics::float(None, None, None, 7, true, false, false)); + check_stats(Statistics::float(Some(1.2), Some(3.4), None, 7, true)); + check_stats(Statistics::float(Some(1.2), Some(3.4), None, 0, false)); + check_stats(Statistics::float(None, None, None, 7, true)); - check_stats(Statistics::double( - Some(1.2), - Some(3.4), - None, - 7, - true, - true, - true, - )); - check_stats(Statistics::double( - Some(1.2), - Some(3.4), - None, - 0, - false, - true, - true, - )); - check_stats(Statistics::double(None, None, None, 7, true, false, false)); + check_stats(Statistics::double(Some(1.2), Some(3.4), None, 7, true)); + check_stats(Statistics::double(Some(1.2), Some(3.4), None, 0, false)); + check_stats(Statistics::double(None, None, None, 7, true)); check_stats(Statistics::byte_array( Some(ByteArray::from(vec![1, 2, 3])), @@ -946,12 +864,8 @@ mod tests { None, 7, true, - true, - true, - )); - check_stats(Statistics::byte_array( - None, None, None, 7, true, false, false, )); + check_stats(Statistics::byte_array(None, None, None, 7, true)); check_stats(Statistics::fixed_len_byte_array( Some(ByteArray::from(vec![1, 2, 3]).into()), @@ -959,11 +873,7 @@ mod tests { None, 7, true, - true, - true, - )); - check_stats(Statistics::fixed_len_byte_array( - None, None, None, 7, true, false, false, )); + check_stats(Statistics::fixed_len_byte_array(None, None, None, 7, true)); } } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 7c2795f1f38f..2b9f261d9f42 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1044,15 +1044,7 @@ mod tests { encoding: Encoding::DELTA_BINARY_PACKED, def_level_encoding: Encoding::RLE, rep_level_encoding: Encoding::RLE, - statistics: Some(Statistics::int32( - Some(1), - Some(3), - None, - 7, - true, - true, - true, - )), + statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)), }, Page::DataPageV2 { buf: Bytes::from(vec![4; 128]), @@ -1063,15 +1055,7 @@ mod tests { def_levels_byte_len: 24, rep_levels_byte_len: 32, is_compressed: false, - statistics: Some(Statistics::int32( - Some(1), - Some(3), - None, - 7, - true, - true, - true, - )), + statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)), }, ]; @@ -1094,15 +1078,7 @@ mod tests { encoding: Encoding::DELTA_BINARY_PACKED, def_level_encoding: Encoding::RLE, rep_level_encoding: Encoding::RLE, - statistics: Some(Statistics::int32( - Some(1), - Some(3), - None, - 7, - true, - true, - true, - )), + statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)), }, Page::DataPageV2 { buf: Bytes::from(vec![4; 128]), From 043cc642038323e107a06211c6463f082cf37e77 Mon Sep 17 00:00:00 2001 From: emcake <3726783+emcake@users.noreply.github.com> Date: Tue, 14 Nov 2023 17:11:30 +0000 Subject: [PATCH 5/6] reduce code noise --- parquet/src/column/page.rs | 4 ++-- parquet/src/file/statistics.rs | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 4973f070bb00..947a633f48a2 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -384,7 +384,7 @@ mod tests { assert_eq!(data_page.encoding(), Encoding::PLAIN); assert_eq!( data_page.statistics(), - Some(&Statistics::int32(Some(1), Some(2), None, 1, true,)) + Some(&Statistics::int32(Some(1), Some(2), None, 1, true)) ); let data_page_v2 = Page::DataPageV2 { @@ -404,7 +404,7 @@ mod tests { assert_eq!(data_page_v2.encoding(), Encoding::PLAIN); assert_eq!( data_page_v2.statistics(), - Some(&Statistics::int32(Some(1), Some(2), None, 1, true,)) + Some(&Statistics::int32(Some(1), Some(2), None, 1, true)) ); let dict_page = Page::DictionaryPage { diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index 96ee14ecc088..1bc003d48854 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -755,11 +755,11 @@ mod tests { fn test_statistics_partial_eq() { let expected = Statistics::int32(Some(12), Some(45), None, 11, true); - assert!(Statistics::int32(Some(12), Some(45), None, 11, true,) == expected); - assert!(Statistics::int32(Some(11), Some(45), None, 11, true,) != expected); - assert!(Statistics::int32(Some(12), Some(44), None, 11, true,) != expected); - assert!(Statistics::int32(Some(12), Some(45), None, 23, true,) != expected); - assert!(Statistics::int32(Some(12), Some(45), None, 11, false,) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 11, true) == expected); + assert!(Statistics::int32(Some(11), Some(45), None, 11, true) != expected); + assert!(Statistics::int32(Some(12), Some(44), None, 11, true) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 23, true) != expected); + assert!(Statistics::int32(Some(12), Some(45), None, 11, false) != expected); assert!( Statistics::int32(Some(12), Some(45), None, 11, false) @@ -777,7 +777,7 @@ mod tests { Some(ByteArray::from(vec![1, 2, 3])), None, 0, - true, + true ) != Statistics::fixed_len_byte_array( Some(ByteArray::from(vec![1, 2, 3]).into()), Some(ByteArray::from(vec![1, 2, 3]).into()), From 9d73490dabf14d89a2d8b09364e50d2318ac1d7d Mon Sep 17 00:00:00 2001 From: emcake <3726783+emcake@users.noreply.github.com> Date: Tue, 14 Nov 2023 17:14:57 +0000 Subject: [PATCH 6/6] remove redundant clone --- parquet/src/column/writer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 6b1394337508..11c39685911c 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -880,7 +880,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Statistics::ByteArray(stats) if stats.has_min_max_set() => { let (min, did_truncate_min) = self.truncate_min_value( self.props.statistics_truncate_length(), - stats.min_bytes().clone(), + stats.min_bytes(), ); let (max, did_truncate_max) = self.truncate_max_value( self.props.statistics_truncate_length(),