From 6671986afb77f20b3eb3202bcaf4d0a375af8cac Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sat, 22 Jun 2024 00:23:31 +0800 Subject: [PATCH 1/7] Update ListingTable to use StatisticsConverter --- .../src/datasource/file_format/parquet.rs | 244 +++++++----------- .../datasource/physical_plan/parquet/mod.rs | 2 +- .../physical_plan/parquet/page_filter.rs | 7 +- .../physical_plan/parquet/row_groups.rs | 10 +- .../physical_plan/parquet/statistics.rs | 125 ++++++++- 5 files changed, 226 insertions(+), 162 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 572904254fd7..7de98e954192 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -25,25 +25,20 @@ use std::sync::Arc; use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; use super::{FileFormat, FileScanConfig}; -use crate::arrow::array::{ - BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, -}; -use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef}; +use crate::arrow::array::RecordBatch; +use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig}; -use crate::datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapterFactory, -}; use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Statistics, }; +use arrow::compute::sum; use datafusion_common::config::TableParquetOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::stats::Precision; @@ -52,6 +47,7 @@ use datafusion_common::{ }; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; +use datafusion_physical_expr::expressions::{MinAccumulator,MaxAccumulator}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use datafusion_physical_plan::metrics::MetricsSet; @@ -66,16 +62,17 @@ use parquet::arrow::{ arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, }; use parquet::file::footer::{decode_footer, decode_metadata}; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::properties::WriterProperties; -use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; -use crate::datasource::physical_plan::parquet::ParquetExecBuilder; +use crate::datasource::physical_plan::parquet::{ + ParquetExecBuilder, StatisticsConverter, +}; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::path::Path; @@ -295,86 +292,6 @@ impl FileFormat for ParquetFormat { } } -fn summarize_min_max( - max_values: &mut [Option], - min_values: &mut [Option], - fields: &Fields, - i: usize, - stat: &ParquetStatistics, -) { - if !stat.has_min_max_set() { - max_values[i] = None; - min_values[i] = None; - return; - } - match stat { - ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - _ => { - max_values[i] = None; - min_values[i] = None; - } - } -} - /// Fetches parquet metadata from ObjectStore for given object /// /// This component is a subject to **change** in near future and is exposed for low level integrations @@ -467,7 +384,7 @@ async fn fetch_statistics( statistics_from_parquet_meta(&metadata, table_schema).await } -/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] +/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using ['StatisticsConverter`] /// /// The statistics are calculated for each column in the table schema /// using the row group statistics in the parquet metadata. @@ -476,77 +393,107 @@ pub async fn statistics_from_parquet_meta( table_schema: SchemaRef, ) -> Result { let file_metadata = metadata.file_metadata(); - let file_schema = parquet_to_arrow_schema( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; - let num_fields = table_schema.fields().len(); - let fields = table_schema.fields(); + let mut fields_iter = table_schema.fields().iter(); + let Some(first_field) = fields_iter.next() else { + return Ok(Statistics::new_unknown(&table_schema)); + }; - let mut num_rows = 0; - let mut total_byte_size = 0; - let mut null_counts = vec![Precision::Exact(0); num_fields]; - let mut has_statistics = false; + let stats_converter = StatisticsConverter::try_new_from_arrow_schema_index( + 0, + first_field, + &file_schema, + file_metadata.schema_descr(), + )?; - let schema_adapter = - DefaultSchemaAdapterFactory::default().create(table_schema.clone()); + let row_groups_metadata = metadata.row_groups(); + let Some(num_rows_array) = + stats_converter.row_group_row_counts(row_groups_metadata)? + else { + return Ok(Statistics::new_unknown(&table_schema)); + }; - let (mut max_values, mut min_values) = create_max_min_accs(&table_schema); + let Some(total_byte_size_array) = + stats_converter.row_group_row_total_bytes(row_groups_metadata)? + else { + return Ok(Statistics::new_unknown(&table_schema)); + }; - for row_group_meta in metadata.row_groups() { - num_rows += row_group_meta.num_rows(); - total_byte_size += row_group_meta.total_byte_size(); + let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); + let mut null_counts_array = + vec![Precision::Exact(0); table_schema.fields().len()]; + summarize_min_max_null_counts( + &mut min_accs, + &mut max_accs, + &mut null_counts_array, + 0, + &stats_converter, + row_groups_metadata, + )?; - let mut column_stats: HashMap = HashMap::new(); + fields_iter.enumerate().for_each(|(idx, field)| { + let _ = StatisticsConverter::try_new_from_arrow_schema_index( + idx, + field, + &file_schema, + file_metadata.schema_descr(), + ) + .and_then(|stats_converter| { + summarize_min_max_null_counts( + &mut min_accs, + &mut max_accs, + &mut null_counts_array, + idx, + &stats_converter, + row_groups_metadata, + ) + }); + }); + + Ok(Statistics { + num_rows: Precision::Exact(sum(&num_rows_array).unwrap_or_default() as usize), + total_byte_size: Precision::Exact( + sum(&total_byte_size_array).unwrap_or_default() as usize, + ), + column_statistics: get_col_stats( + &table_schema, + null_counts_array, + &mut max_accs, + &mut min_accs, + ), + }) +} - for (i, column) in row_group_meta.columns().iter().enumerate() { - if let Some(stat) = column.statistics() { - has_statistics = true; - column_stats.insert(i, (stat.null_count(), stat)); - } - } +fn summarize_min_max_null_counts( + min_accs: & mut [Option], + max_accs: & mut [Option], + null_counts_array: & mut [Precision], + arrow_schema_index: usize, + stats_converter: &StatisticsConverter, + row_groups_metadata: &[RowGroupMetaData], +) -> Result<()> { + let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; + let min_values = stats_converter.row_group_mins(row_groups_metadata)?; + let null_counts = + stats_converter.row_group_null_counts(row_groups_metadata)?; - if has_statistics { - for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() { - if let Some(file_idx) = - schema_adapter.map_column_index(table_idx, &file_schema) - { - if let Some((null_count, stats)) = column_stats.get(&file_idx) { - *null_cnt = null_cnt.add(&Precision::Exact(*null_count as usize)); - summarize_min_max( - &mut max_values, - &mut min_values, - fields, - table_idx, - stats, - ) - } else { - // If none statistics of current column exists, set the Max/Min Accumulator to None. - max_values[table_idx] = None; - min_values[table_idx] = None; - } - } else { - *null_cnt = null_cnt.add(&Precision::Exact(num_rows as usize)); - } - } - } + if let Some(max_acc) = & mut max_accs[arrow_schema_index] { + max_acc.update_batch(&[max_values])?; } - let column_stats = if has_statistics { - get_col_stats(&table_schema, null_counts, &mut max_values, &mut min_values) - } else { - Statistics::unknown_column(&table_schema) - }; + if let Some(min_acc) = & mut min_accs[arrow_schema_index] { + min_acc.update_batch(&[min_values])?; + } - let statistics = Statistics { - num_rows: Precision::Exact(num_rows as usize), - total_byte_size: Precision::Exact(total_byte_size as usize), - column_statistics: column_stats, - }; + null_counts_array[arrow_schema_index] = Precision::Exact( + sum(&null_counts).unwrap_or_default() as usize, + ); - Ok(statistics) + Ok(()) } /// Implements [`DataSink`] for writing to a parquet file. @@ -1126,7 +1073,8 @@ mod tests { use crate::physical_plan::metrics::MetricValue; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; - use arrow_schema::Field; + use arrow_array::Int64Array; + use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion_common::cast::{ as_binary_array, as_boolean_array, as_float32_array, as_float64_array, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index ec21c5504c69..7cf25791d085 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -54,7 +54,7 @@ mod page_filter; mod reader; mod row_filter; mod row_groups; -mod statistics; +pub mod statistics; mod writer; use crate::datasource::schema_adapter::{ diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 7429ca593820..ff323cf86751 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -41,13 +41,12 @@ use std::collections::HashSet; use std::sync::Arc; use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; -use crate::datasource::physical_plan::parquet::statistics::{ - from_bytes_to_i128, parquet_column, -}; +use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128; use crate::datasource::physical_plan::parquet::ParquetAccessPlan; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; +use super::statistics::parquet_column_by_name; /// A [`PagePruningPredicate`] provides the ability to construct a [`RowSelection`] /// based on parquet page level statistics, if any @@ -309,7 +308,7 @@ fn find_column_index( return None; }; - parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0) + parquet_column_by_name(parquet_schema, arrow_schema, column.name()).map(|x| x.0) } /// Returns a `RowSelection` for the pages in this RowGroup if any diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index e590f372253c..2f5505b06212 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -30,7 +30,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::datasource::listing::FileRange; -use crate::datasource::physical_plan::parquet::statistics::parquet_column; +use crate::datasource::physical_plan::parquet::statistics::parquet_column_by_name; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::{ParquetAccessPlan, ParquetFileMetrics, StatisticsConverter}; @@ -169,9 +169,11 @@ impl RowGroupAccessPlanFilter { let mut column_sbbf = HashMap::with_capacity(literal_columns.len()); for column_name in literal_columns { - let Some((column_idx, _field)) = - parquet_column(builder.parquet_schema(), arrow_schema, &column_name) - else { + let Some((column_idx, _field)) = parquet_column_by_name( + builder.parquet_schema(), + arrow_schema, + &column_name, + ) else { continue; }; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 3be060ce6180..c0a4b49450a9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -698,10 +698,37 @@ macro_rules! get_data_page_statistics { } } +/// Looks up the parquet column by arrow schema index +pub(crate) fn parquet_column_by_arrow_schema_index<'a>( + parquet_schema: &SchemaDescriptor, + arrow_schema: &'a Schema, + root_index: usize, +) -> Option<(usize, &'a FieldRef)> { + // This get could be done in constant time + let field = arrow_schema.fields().get(root_index)?; + + if field.data_type().is_nested() { + // Nested fields are not supported and require non-trivial logic + // to correctly walk the parquet schema accounting for the + // logical type rules - + // + // For example a ListArray could correspond to anything from 1 to 3 levels + // in the parquet schema + return None; + } + + // This could be made more efficient (#TBD) + let parquet_idx = (0..parquet_schema.columns().len()) + .find(|x| parquet_schema.get_column_root_idx(*x) == root_index)?; + Some((parquet_idx, field)) +} + /// Lookups up the parquet column by name /// /// Returns the parquet column index and the corresponding arrow field -pub(crate) fn parquet_column<'a>( +/// It is less efficient to reuse function [`parquet_column_by_arrow_schema_index`] +/// as the root_idx could be found at once. +pub(crate) fn parquet_column_by_name<'a>( parquet_schema: &SchemaDescriptor, arrow_schema: &'a Schema, name: &str, @@ -864,6 +891,54 @@ impl<'a> StatisticsConverter<'a> { /// ); /// ``` pub fn row_group_row_counts(&self, metadatas: I) -> Result> + where + I: IntoIterator, + { + let mut builder = UInt64Array::builder(10); + for metadata in metadatas.into_iter() { + let row_count = metadata.num_rows(); + let row_count: u64 = row_count.try_into().map_err(|e| { + internal_datafusion_err!( + "Parquet row count {row_count} too large to convert to u64: {e}" + ) + })?; + builder.append_value(row_count); + } + Ok(Some(builder.finish())) + } + + /// Returns a [`UInt64Array`] with total byte sizes for each row group + /// + /// # Return Value + /// + /// The returned array has no nulls, and has one value for each row group. + /// Each value is the total byte size of the row group. + /// + /// # Example + /// ```no_run + /// # use arrow::datatypes::Schema; + /// # use arrow_array::ArrayRef; + /// # use parquet::file::metadata::ParquetMetaData; + /// # use datafusion::datasource::physical_plan::parquet::StatisticsConverter; + /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() } + /// # fn get_arrow_schema() -> Schema { unimplemented!() } + /// // Given the metadata for a parquet file and the arrow schema + /// let metadata: ParquetMetaData = get_parquet_metadata(); + /// let arrow_schema: Schema = get_arrow_schema(); + /// let parquet_schema = metadata.file_metadata().schema_descr(); + /// // create a converter + /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema) + /// .unwrap(); + /// // get the row counts for each row group + /// let row_counts = converter.row_group_row_total_bytes(metadata + /// .row_groups() + /// .iter() + /// ); + /// ``` + pub fn row_group_row_total_bytes( + &self, + metadatas: I, + ) -> Result> where I: IntoIterator, { @@ -873,7 +948,7 @@ impl<'a> StatisticsConverter<'a> { let mut builder = UInt64Array::builder(10); for metadata in metadatas.into_iter() { - let row_count = metadata.num_rows(); + let row_count = metadata.total_byte_size(); let row_count: u64 = row_count.try_into().map_err(|e| { internal_datafusion_err!( "Parquet row count {row_count} too large to convert to u64: {e}" @@ -909,7 +984,7 @@ impl<'a> StatisticsConverter<'a> { }; // find the column in the parquet schema, if not, return a null array - let parquet_index = match parquet_column( + let parquet_index = match parquet_column_by_name( parquet_schema, arrow_schema, column_name, @@ -934,6 +1009,44 @@ impl<'a> StatisticsConverter<'a> { }) } + /// Create a new `StatisticsConverter` to extract statistics for a column from the index + /// of the column in the arrow schema + /// + /// This is a more efficient version of [`Self::try_new`] that avoids looking up the column + /// by name in the arrow schema and useful when the column index is already known or when + /// iterating over the columns in the arrow schema. + pub fn try_new_from_arrow_schema_index( + arrow_schema_index: usize, + arrow_field: &'a Field, + arrow_schema: &'a Schema, + parquet_schema: &'a SchemaDescriptor, + ) -> Result { + // find the column in the parquet schema, if not, return a null array + let parquet_index = match parquet_column_by_arrow_schema_index( + parquet_schema, + arrow_schema, + arrow_schema_index, + ) { + Some((parquet_idx, matched_field)) => { + // sanity check that matching field matches the arrow field + if matched_field.as_ref() != arrow_field { + return internal_err!( + "Matched column '{:?}' does not match original matched column '{:?}'", + matched_field, + arrow_field + ); + } + Some(parquet_idx) + } + None => None, + }; + + Ok(Self { + parquet_index, + arrow_field, + }) + } + /// Extract the minimum values from row group statistics in [`RowGroupMetaData`] /// /// # Return Value @@ -1878,7 +1991,8 @@ mod test { let parquet_schema = metadata.file_metadata().schema_descr(); // read the int_col statistics - let (idx, _) = parquet_column(parquet_schema, &schema, "int_col").unwrap(); + let (idx, _) = + parquet_column_by_name(parquet_schema, &schema, "int_col").unwrap(); assert_eq!(idx, 2); let row_groups = metadata.row_groups(); @@ -2076,7 +2190,8 @@ mod test { for field in schema.fields() { if field.data_type().is_nested() { - let lookup = parquet_column(parquet_schema, &schema, field.name()); + let lookup = + parquet_column_by_name(parquet_schema, &schema, field.name()); assert_eq!(lookup, None); continue; } From 5dbfbd6251d2cc599a59705bd77004349f7cba02 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sat, 22 Jun 2024 21:01:46 +0800 Subject: [PATCH 2/7] complete support for all types parquet --- .../src/datasource/file_format/parquet.rs | 138 ++++++++++-------- .../physical_plan/parquet/statistics.rs | 65 +-------- 2 files changed, 80 insertions(+), 123 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7de98e954192..b07044e70f9f 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -43,16 +43,17 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::stats::Precision; use datafusion_common::{ - exec_err, internal_datafusion_err, not_impl_err, DataFusionError, + exec_err, internal_datafusion_err, not_impl_err, DataFusionError, ScalarValue::Utf8, }; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; -use datafusion_physical_expr::expressions::{MinAccumulator,MaxAccumulator}; +use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; +use log::debug; use object_store::buffered::BufWriter; use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, @@ -398,100 +399,119 @@ pub async fn statistics_from_parquet_meta( file_metadata.key_value_metadata(), )?; + let mut statistics = Statistics::new_unknown(&table_schema); + let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); + let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()]; + let row_groups_metadata = metadata.row_groups(); + + let Some(num_rows) = row_groups_metadata.first().map(|rg| rg.num_rows() as usize) + else { + return Ok(statistics); + }; + statistics.num_rows = Precision::Exact(num_rows); + let mut fields_iter = table_schema.fields().iter(); let Some(first_field) = fields_iter.next() else { - return Ok(Statistics::new_unknown(&table_schema)); + return Ok(statistics); }; - let stats_converter = StatisticsConverter::try_new_from_arrow_schema_index( - 0, - first_field, + let option_stats_converter; + match StatisticsConverter::try_new( + first_field.name(), &file_schema, file_metadata.schema_descr(), - )?; - - let row_groups_metadata = metadata.row_groups(); - let Some(num_rows_array) = - stats_converter.row_group_row_counts(row_groups_metadata)? - else { - return Ok(Statistics::new_unknown(&table_schema)); + ) { + Ok(sc) => { + option_stats_converter = Some(sc); + } + Err(e) => { + debug!("Failed to create statistics converter: {}", e); + option_stats_converter = None; + null_counts_array[0] = Precision::Exact(num_rows); + } }; - let Some(total_byte_size_array) = - stats_converter.row_group_row_total_bytes(row_groups_metadata)? - else { - return Ok(Statistics::new_unknown(&table_schema)); - }; + if option_stats_converter.is_some() { + let stats_converter = option_stats_converter.unwrap(); + let Some(total_byte_size_array) = + stats_converter.row_group_row_total_bytes(row_groups_metadata)? + else { + return Ok(statistics); + }; + let total_byte_size = sum(&total_byte_size_array).unwrap_or_default() as usize; + statistics.total_byte_size = Precision::Exact(total_byte_size); - let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); - let mut null_counts_array = - vec![Precision::Exact(0); table_schema.fields().len()]; summarize_min_max_null_counts( &mut min_accs, &mut max_accs, &mut null_counts_array, 0, + num_rows, &stats_converter, row_groups_metadata, )?; + } fields_iter.enumerate().for_each(|(idx, field)| { - let _ = StatisticsConverter::try_new_from_arrow_schema_index( - idx, - field, + match StatisticsConverter::try_new( + field.name(), &file_schema, file_metadata.schema_descr(), - ) - .and_then(|stats_converter| { - summarize_min_max_null_counts( - &mut min_accs, - &mut max_accs, - &mut null_counts_array, - idx, - &stats_converter, - row_groups_metadata, - ) - }); + ) { + Ok(stats_converter) => { + summarize_min_max_null_counts( + &mut min_accs, + &mut max_accs, + &mut null_counts_array, + idx + 1, + num_rows, + &stats_converter, + row_groups_metadata, + ) + .ok(); + } + Err(e) => { + debug!("Failed to create statistics converter: {}", e); + null_counts_array[idx + 1] = Precision::Exact(num_rows); + } + } }); - Ok(Statistics { - num_rows: Precision::Exact(sum(&num_rows_array).unwrap_or_default() as usize), - total_byte_size: Precision::Exact( - sum(&total_byte_size_array).unwrap_or_default() as usize, - ), - column_statistics: get_col_stats( - &table_schema, - null_counts_array, - &mut max_accs, - &mut min_accs, - ), - }) + statistics.column_statistics = get_col_stats( + &table_schema, + null_counts_array, + &mut max_accs, + &mut min_accs, + ); + + Ok(statistics) } fn summarize_min_max_null_counts( - min_accs: & mut [Option], - max_accs: & mut [Option], - null_counts_array: & mut [Precision], + min_accs: &mut [Option], + max_accs: &mut [Option], + null_counts_array: &mut [Precision], arrow_schema_index: usize, + num_rows: usize, stats_converter: &StatisticsConverter, row_groups_metadata: &[RowGroupMetaData], ) -> Result<()> { let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; let min_values = stats_converter.row_group_mins(row_groups_metadata)?; - let null_counts = - stats_converter.row_group_null_counts(row_groups_metadata)?; + let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; - if let Some(max_acc) = & mut max_accs[arrow_schema_index] { + if let Some(max_acc) = &mut max_accs[arrow_schema_index] { max_acc.update_batch(&[max_values])?; } - if let Some(min_acc) = & mut min_accs[arrow_schema_index] { + if let Some(min_acc) = &mut min_accs[arrow_schema_index] { min_acc.update_batch(&[min_values])?; } - null_counts_array[arrow_schema_index] = Precision::Exact( - sum(&null_counts).unwrap_or_default() as usize, - ); + null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) { + Some(null_count) => null_count as usize, + None => num_rows, + }); Ok(()) } @@ -1397,8 +1417,8 @@ mod tests { // column c1 let c1_stats = &stats.column_statistics[0]; assert_eq!(c1_stats.null_count, Precision::Exact(1)); - assert_eq!(c1_stats.max_value, Precision::Absent); - assert_eq!(c1_stats.min_value, Precision::Absent); + assert_eq!(c1_stats.max_value, Precision::Exact(Utf8(Some("bar".to_string())))); + assert_eq!(c1_stats.min_value, Precision::Exact(Utf8(Some("Foo".to_string())))); // column c2: missing from the file so the table treats all 3 rows as null let c2_stats = &stats.column_statistics[1]; assert_eq!(c2_stats.null_count, Precision::Exact(3)); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index c0a4b49450a9..293aa9cb9264 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -67,7 +67,7 @@ pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option { // Copy from arrow-rs // https://github.com/apache/arrow-rs/blob/198af7a3f4aa20f9bd003209d9f04b0f37bb120e/parquet/src/arrow/buffer/bit_util.rs#L54 // Convert the byte slice to fixed length byte array with the length of N. -pub fn sign_extend_be(b: &[u8]) -> [u8; N] { +fn sign_extend_be(b: &[u8]) -> [u8; N] { assert!(b.len() <= N, "Array too large, expected less than {N}"); let is_negative = (b[0] & 128u8) == 128u8; let mut result = if is_negative { [255u8; N] } else { [0u8; N] }; @@ -698,31 +698,6 @@ macro_rules! get_data_page_statistics { } } -/// Looks up the parquet column by arrow schema index -pub(crate) fn parquet_column_by_arrow_schema_index<'a>( - parquet_schema: &SchemaDescriptor, - arrow_schema: &'a Schema, - root_index: usize, -) -> Option<(usize, &'a FieldRef)> { - // This get could be done in constant time - let field = arrow_schema.fields().get(root_index)?; - - if field.data_type().is_nested() { - // Nested fields are not supported and require non-trivial logic - // to correctly walk the parquet schema accounting for the - // logical type rules - - // - // For example a ListArray could correspond to anything from 1 to 3 levels - // in the parquet schema - return None; - } - - // This could be made more efficient (#TBD) - let parquet_idx = (0..parquet_schema.columns().len()) - .find(|x| parquet_schema.get_column_root_idx(*x) == root_index)?; - Some((parquet_idx, field)) -} - /// Lookups up the parquet column by name /// /// Returns the parquet column index and the corresponding arrow field @@ -1009,44 +984,6 @@ impl<'a> StatisticsConverter<'a> { }) } - /// Create a new `StatisticsConverter` to extract statistics for a column from the index - /// of the column in the arrow schema - /// - /// This is a more efficient version of [`Self::try_new`] that avoids looking up the column - /// by name in the arrow schema and useful when the column index is already known or when - /// iterating over the columns in the arrow schema. - pub fn try_new_from_arrow_schema_index( - arrow_schema_index: usize, - arrow_field: &'a Field, - arrow_schema: &'a Schema, - parquet_schema: &'a SchemaDescriptor, - ) -> Result { - // find the column in the parquet schema, if not, return a null array - let parquet_index = match parquet_column_by_arrow_schema_index( - parquet_schema, - arrow_schema, - arrow_schema_index, - ) { - Some((parquet_idx, matched_field)) => { - // sanity check that matching field matches the arrow field - if matched_field.as_ref() != arrow_field { - return internal_err!( - "Matched column '{:?}' does not match original matched column '{:?}'", - matched_field, - arrow_field - ); - } - Some(parquet_idx) - } - None => None, - }; - - Ok(Self { - parquet_index, - arrow_field, - }) - } - /// Extract the minimum values from row group statistics in [`RowGroupMetaData`] /// /// # Return Value From 28eb6c1eb3153af127fdf9097c0ef04f12ef0f20 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sat, 22 Jun 2024 21:04:25 +0800 Subject: [PATCH 3/7] Fix misc --- datafusion/core/src/datasource/file_format/parquet.rs | 10 ++++++++-- .../datasource/physical_plan/parquet/page_filter.rs | 4 ++-- .../src/datasource/physical_plan/parquet/row_groups.rs | 10 ++++------ .../src/datasource/physical_plan/parquet/statistics.rs | 10 ++++------ 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index b07044e70f9f..0634ac7ef200 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1417,8 +1417,14 @@ mod tests { // column c1 let c1_stats = &stats.column_statistics[0]; assert_eq!(c1_stats.null_count, Precision::Exact(1)); - assert_eq!(c1_stats.max_value, Precision::Exact(Utf8(Some("bar".to_string())))); - assert_eq!(c1_stats.min_value, Precision::Exact(Utf8(Some("Foo".to_string())))); + assert_eq!( + c1_stats.max_value, + Precision::Exact(Utf8(Some("bar".to_string()))) + ); + assert_eq!( + c1_stats.min_value, + Precision::Exact(Utf8(Some("Foo".to_string()))) + ); // column c2: missing from the file so the table treats all 3 rows as null let c2_stats = &stats.column_statistics[1]; assert_eq!(c2_stats.null_count, Precision::Exact(3)); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index ff323cf86751..f913618519f9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -46,7 +46,7 @@ use crate::datasource::physical_plan::parquet::ParquetAccessPlan; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; -use super::statistics::parquet_column_by_name; +use super::statistics::parquet_column; /// A [`PagePruningPredicate`] provides the ability to construct a [`RowSelection`] /// based on parquet page level statistics, if any @@ -308,7 +308,7 @@ fn find_column_index( return None; }; - parquet_column_by_name(parquet_schema, arrow_schema, column.name()).map(|x| x.0) + parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0) } /// Returns a `RowSelection` for the pages in this RowGroup if any diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 2f5505b06212..e590f372253c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -30,7 +30,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::datasource::listing::FileRange; -use crate::datasource::physical_plan::parquet::statistics::parquet_column_by_name; +use crate::datasource::physical_plan::parquet::statistics::parquet_column; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::{ParquetAccessPlan, ParquetFileMetrics, StatisticsConverter}; @@ -169,11 +169,9 @@ impl RowGroupAccessPlanFilter { let mut column_sbbf = HashMap::with_capacity(literal_columns.len()); for column_name in literal_columns { - let Some((column_idx, _field)) = parquet_column_by_name( - builder.parquet_schema(), - arrow_schema, - &column_name, - ) else { + let Some((column_idx, _field)) = + parquet_column(builder.parquet_schema(), arrow_schema, &column_name) + else { continue; }; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 293aa9cb9264..db075fc3110e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -703,7 +703,7 @@ macro_rules! get_data_page_statistics { /// Returns the parquet column index and the corresponding arrow field /// It is less efficient to reuse function [`parquet_column_by_arrow_schema_index`] /// as the root_idx could be found at once. -pub(crate) fn parquet_column_by_name<'a>( +pub(crate) fn parquet_column<'a>( parquet_schema: &SchemaDescriptor, arrow_schema: &'a Schema, name: &str, @@ -959,7 +959,7 @@ impl<'a> StatisticsConverter<'a> { }; // find the column in the parquet schema, if not, return a null array - let parquet_index = match parquet_column_by_name( + let parquet_index = match parquet_column( parquet_schema, arrow_schema, column_name, @@ -1928,8 +1928,7 @@ mod test { let parquet_schema = metadata.file_metadata().schema_descr(); // read the int_col statistics - let (idx, _) = - parquet_column_by_name(parquet_schema, &schema, "int_col").unwrap(); + let (idx, _) = parquet_column(parquet_schema, &schema, "int_col").unwrap(); assert_eq!(idx, 2); let row_groups = metadata.row_groups(); @@ -2127,8 +2126,7 @@ mod test { for field in schema.fields() { if field.data_type().is_nested() { - let lookup = - parquet_column_by_name(parquet_schema, &schema, field.name()); + let lookup = parquet_column(parquet_schema, &schema, field.name()); assert_eq!(lookup, None); continue; } From e77f1bdd37e0a3e5dab2f6a7fd0d7f6dc05ca343 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sat, 22 Jun 2024 21:07:40 +0800 Subject: [PATCH 4/7] Fix misc --- datafusion/core/src/datasource/file_format/parquet.rs | 6 +++--- .../core/src/datasource/physical_plan/parquet/statistics.rs | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0634ac7ef200..d7a14ec7d402 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -43,7 +43,7 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::stats::Precision; use datafusion_common::{ - exec_err, internal_datafusion_err, not_impl_err, DataFusionError, ScalarValue::Utf8, + exec_err, internal_datafusion_err, not_impl_err, DataFusionError }; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; @@ -1419,11 +1419,11 @@ mod tests { assert_eq!(c1_stats.null_count, Precision::Exact(1)); assert_eq!( c1_stats.max_value, - Precision::Exact(Utf8(Some("bar".to_string()))) + Precision::Exact(ScalarValue::Utf8(Some("bar".to_string()))) ); assert_eq!( c1_stats.min_value, - Precision::Exact(Utf8(Some("Foo".to_string()))) + Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string()))) ); // column c2: missing from the file so the table treats all 3 rows as null let c2_stats = &stats.column_statistics[1]; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index db075fc3110e..ed45a11d197c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -701,8 +701,6 @@ macro_rules! get_data_page_statistics { /// Lookups up the parquet column by name /// /// Returns the parquet column index and the corresponding arrow field -/// It is less efficient to reuse function [`parquet_column_by_arrow_schema_index`] -/// as the root_idx could be found at once. pub(crate) fn parquet_column<'a>( parquet_schema: &SchemaDescriptor, arrow_schema: &'a Schema, From ab96a177017e088d742467eec1d2d9773b797646 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sun, 23 Jun 2024 15:17:24 +0800 Subject: [PATCH 5/7] fix test --- .../core/src/datasource/file_format/parquet.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index d7a14ec7d402..536ddc3a996d 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -43,7 +43,7 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::stats::Precision; use datafusion_common::{ - exec_err, internal_datafusion_err, not_impl_err, DataFusionError + exec_err, internal_datafusion_err, not_impl_err, DataFusionError, }; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; @@ -404,10 +404,13 @@ pub async fn statistics_from_parquet_meta( let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()]; let row_groups_metadata = metadata.row_groups(); - let Some(num_rows) = row_groups_metadata.first().map(|rg| rg.num_rows() as usize) - else { - return Ok(statistics); - }; + // The num_rows needs to be calculated even when the statistics converter fails. + // This is due to the null counts being calculated based on the number of rows when the prerequisites is not met. + // Below the test read_merged_batches checks for this behavior. + let num_rows = row_groups_metadata + .iter() + .map(|rg| rg.num_rows() as usize) + .sum(); statistics.num_rows = Precision::Exact(num_rows); let mut fields_iter = table_schema.fields().iter(); From 8e69e6201ce384c43cb0612cf5722568309e45a5 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sun, 23 Jun 2024 16:18:31 +0800 Subject: [PATCH 6/7] fix misc --- .../src/datasource/file_format/parquet.rs | 29 ++++++++++++++----- .../datasource/physical_plan/parquet/mod.rs | 2 +- .../physical_plan/parquet/page_filter.rs | 5 ++-- .../physical_plan/parquet/statistics.rs | 6 +++- 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 536ddc3a996d..5df3b9e424d5 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -393,6 +393,18 @@ pub async fn statistics_from_parquet_meta( metadata: &ParquetMetaData, table_schema: SchemaRef, ) -> Result { + let row_groups_metadata = metadata.row_groups(); + + let mut has_statistics = false; + for row_group_meta in row_groups_metadata { + for column in row_group_meta.columns() { + if let Some(_) = column.statistics() { + has_statistics = true; + break; + } + } + } + let file_metadata = metadata.file_metadata(); let file_schema = parquet_to_arrow_schema( file_metadata.schema_descr(), @@ -402,7 +414,6 @@ pub async fn statistics_from_parquet_meta( let mut statistics = Statistics::new_unknown(&table_schema); let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()]; - let row_groups_metadata = metadata.row_groups(); // The num_rows needs to be calculated even when the statistics converter fails. // This is due to the null counts being calculated based on the number of rows when the prerequisites is not met. @@ -480,12 +491,16 @@ pub async fn statistics_from_parquet_meta( } }); - statistics.column_statistics = get_col_stats( - &table_schema, - null_counts_array, - &mut max_accs, - &mut min_accs, - ); + statistics.column_statistics = if has_statistics { + get_col_stats( + &table_schema, + null_counts_array, + &mut max_accs, + &mut min_accs, + ) + } else { + Statistics::unknown_column(&table_schema) + }; Ok(statistics) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 7cf25791d085..ec21c5504c69 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -54,7 +54,7 @@ mod page_filter; mod reader; mod row_filter; mod row_groups; -pub mod statistics; +mod statistics; mod writer; use crate::datasource::schema_adapter::{ diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index f913618519f9..7429ca593820 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -41,12 +41,13 @@ use std::collections::HashSet; use std::sync::Arc; use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; -use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128; +use crate::datasource::physical_plan::parquet::statistics::{ + from_bytes_to_i128, parquet_column, +}; use crate::datasource::physical_plan::parquet::ParquetAccessPlan; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; -use super::statistics::parquet_column; /// A [`PagePruningPredicate`] provides the ability to construct a [`RowSelection`] /// based on parquet page level statistics, if any diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index ed45a11d197c..7046e61d75d4 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -67,7 +67,7 @@ pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option { // Copy from arrow-rs // https://github.com/apache/arrow-rs/blob/198af7a3f4aa20f9bd003209d9f04b0f37bb120e/parquet/src/arrow/buffer/bit_util.rs#L54 // Convert the byte slice to fixed length byte array with the length of N. -fn sign_extend_be(b: &[u8]) -> [u8; N] { +pub fn sign_extend_be(b: &[u8]) -> [u8; N] { assert!(b.len() <= N, "Array too large, expected less than {N}"); let is_negative = (b[0] & 128u8) == 128u8; let mut result = if is_negative { [255u8; N] } else { [0u8; N] }; @@ -867,6 +867,10 @@ impl<'a> StatisticsConverter<'a> { where I: IntoIterator, { + let Some(_) = self.parquet_index else { + return Ok(None); + }; + let mut builder = UInt64Array::builder(10); for metadata in metadatas.into_iter() { let row_count = metadata.num_rows(); From 42fcbbdeec127994b8e7332916fb3907838dd790 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Sun, 23 Jun 2024 19:01:08 +0800 Subject: [PATCH 7/7] fix none --- .../src/datasource/file_format/parquet.rs | 128 ++++++------------ .../physical_plan/parquet/statistics.rs | 52 ------- 2 files changed, 44 insertions(+), 136 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 5df3b9e424d5..4204593eba96 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -395,15 +395,22 @@ pub async fn statistics_from_parquet_meta( ) -> Result { let row_groups_metadata = metadata.row_groups(); + let mut statistics = Statistics::new_unknown(&table_schema); let mut has_statistics = false; + let mut num_rows = 0_usize; + let mut total_byte_size = 0_usize; for row_group_meta in row_groups_metadata { - for column in row_group_meta.columns() { - if let Some(_) = column.statistics() { - has_statistics = true; - break; - } + num_rows += row_group_meta.num_rows() as usize; + total_byte_size += row_group_meta.total_byte_size() as usize; + + if !has_statistics { + row_group_meta.columns().iter().for_each(|column| { + has_statistics = column.statistics().is_some(); + }); } } + statistics.num_rows = Precision::Exact(num_rows); + statistics.total_byte_size = Precision::Exact(total_byte_size); let file_metadata = metadata.file_metadata(); let file_schema = parquet_to_arrow_schema( @@ -411,87 +418,40 @@ pub async fn statistics_from_parquet_meta( file_metadata.key_value_metadata(), )?; - let mut statistics = Statistics::new_unknown(&table_schema); - let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); - let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()]; - - // The num_rows needs to be calculated even when the statistics converter fails. - // This is due to the null counts being calculated based on the number of rows when the prerequisites is not met. - // Below the test read_merged_batches checks for this behavior. - let num_rows = row_groups_metadata - .iter() - .map(|rg| rg.num_rows() as usize) - .sum(); - statistics.num_rows = Precision::Exact(num_rows); - - let mut fields_iter = table_schema.fields().iter(); - let Some(first_field) = fields_iter.next() else { - return Ok(statistics); - }; - - let option_stats_converter; - match StatisticsConverter::try_new( - first_field.name(), - &file_schema, - file_metadata.schema_descr(), - ) { - Ok(sc) => { - option_stats_converter = Some(sc); - } - Err(e) => { - debug!("Failed to create statistics converter: {}", e); - option_stats_converter = None; - null_counts_array[0] = Precision::Exact(num_rows); - } - }; - - if option_stats_converter.is_some() { - let stats_converter = option_stats_converter.unwrap(); - let Some(total_byte_size_array) = - stats_converter.row_group_row_total_bytes(row_groups_metadata)? - else { - return Ok(statistics); - }; - let total_byte_size = sum(&total_byte_size_array).unwrap_or_default() as usize; - statistics.total_byte_size = Precision::Exact(total_byte_size); - - summarize_min_max_null_counts( - &mut min_accs, - &mut max_accs, - &mut null_counts_array, - 0, - num_rows, - &stats_converter, - row_groups_metadata, - )?; - } + statistics.column_statistics = if has_statistics { + let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); + let mut null_counts_array = + vec![Precision::Exact(0); table_schema.fields().len()]; - fields_iter.enumerate().for_each(|(idx, field)| { - match StatisticsConverter::try_new( - field.name(), - &file_schema, - file_metadata.schema_descr(), - ) { - Ok(stats_converter) => { - summarize_min_max_null_counts( - &mut min_accs, - &mut max_accs, - &mut null_counts_array, - idx + 1, - num_rows, - &stats_converter, - row_groups_metadata, - ) - .ok(); - } - Err(e) => { - debug!("Failed to create statistics converter: {}", e); - null_counts_array[idx + 1] = Precision::Exact(num_rows); - } - } - }); + table_schema + .fields() + .iter() + .enumerate() + .for_each(|(idx, field)| { + match StatisticsConverter::try_new( + field.name(), + &file_schema, + file_metadata.schema_descr(), + ) { + Ok(stats_converter) => { + summarize_min_max_null_counts( + &mut min_accs, + &mut max_accs, + &mut null_counts_array, + idx, + num_rows, + &stats_converter, + row_groups_metadata, + ) + .ok(); + } + Err(e) => { + debug!("Failed to create statistics converter: {}", e); + null_counts_array[idx] = Precision::Exact(num_rows); + } + } + }); - statistics.column_statistics = if has_statistics { get_col_stats( &table_schema, null_counts_array, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 7046e61d75d4..3be060ce6180 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -884,58 +884,6 @@ impl<'a> StatisticsConverter<'a> { Ok(Some(builder.finish())) } - /// Returns a [`UInt64Array`] with total byte sizes for each row group - /// - /// # Return Value - /// - /// The returned array has no nulls, and has one value for each row group. - /// Each value is the total byte size of the row group. - /// - /// # Example - /// ```no_run - /// # use arrow::datatypes::Schema; - /// # use arrow_array::ArrayRef; - /// # use parquet::file::metadata::ParquetMetaData; - /// # use datafusion::datasource::physical_plan::parquet::StatisticsConverter; - /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() } - /// # fn get_arrow_schema() -> Schema { unimplemented!() } - /// // Given the metadata for a parquet file and the arrow schema - /// let metadata: ParquetMetaData = get_parquet_metadata(); - /// let arrow_schema: Schema = get_arrow_schema(); - /// let parquet_schema = metadata.file_metadata().schema_descr(); - /// // create a converter - /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema) - /// .unwrap(); - /// // get the row counts for each row group - /// let row_counts = converter.row_group_row_total_bytes(metadata - /// .row_groups() - /// .iter() - /// ); - /// ``` - pub fn row_group_row_total_bytes( - &self, - metadatas: I, - ) -> Result> - where - I: IntoIterator, - { - let Some(_) = self.parquet_index else { - return Ok(None); - }; - - let mut builder = UInt64Array::builder(10); - for metadata in metadatas.into_iter() { - let row_count = metadata.total_byte_size(); - let row_count: u64 = row_count.try_into().map_err(|e| { - internal_datafusion_err!( - "Parquet row count {row_count} too large to convert to u64: {e}" - ) - })?; - builder.append_value(row_count); - } - Ok(Some(builder.finish())) - } - /// Create a new `StatisticsConverter` to extract statistics for a column /// /// Note if there is no corresponding column in the parquet file, the returned