-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update ListingTable to use StatisticsConverter, remove redundant statistics extraction code #10924
Changes from all commits
3e6a537
1ed605f
da01b95
d9785ba
f1bcad4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions | ||
|
||
use arrow_array::{Array, ArrayRef, UInt64Array}; | ||
use std::any::Any; | ||
use std::fmt; | ||
use std::fmt::Debug; | ||
|
@@ -25,16 +26,13 @@ use std::sync::Arc; | |
use super::write::demux::start_demuxer_task; | ||
use super::write::{create_writer, SharedBuffer}; | ||
use super::{FileFormat, FileScanConfig}; | ||
use crate::arrow::array::{ | ||
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, | ||
}; | ||
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef}; | ||
use crate::arrow::array::RecordBatch; | ||
use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; | ||
use crate::datasource::file_format::file_compression_type::FileCompressionType; | ||
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig}; | ||
use crate::datasource::schema_adapter::{ | ||
DefaultSchemaAdapterFactory, SchemaAdapterFactory, | ||
}; | ||
use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; | ||
use crate::error::Result; | ||
use crate::execution::context::SessionState; | ||
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; | ||
|
@@ -48,7 +46,8 @@ use datafusion_common::config::TableParquetOptions; | |
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; | ||
use datafusion_common::stats::Precision; | ||
use datafusion_common::{ | ||
exec_err, internal_datafusion_err, not_impl_err, DataFusionError, | ||
exec_err, internal_datafusion_err, not_impl_err, ColumnStatistics, DataFusionError, | ||
ScalarValue, | ||
}; | ||
use datafusion_common_runtime::SpawnedTask; | ||
use datafusion_execution::TaskContext; | ||
|
@@ -68,14 +67,15 @@ use parquet::arrow::{ | |
use parquet::file::footer::{decode_footer, decode_metadata}; | ||
use parquet::file::metadata::ParquetMetaData; | ||
use parquet::file::properties::WriterProperties; | ||
use parquet::file::statistics::Statistics as ParquetStatistics; | ||
use parquet::file::writer::SerializedFileWriter; | ||
use parquet::format::FileMetaData; | ||
use tokio::io::{AsyncWrite, AsyncWriteExt}; | ||
use tokio::sync::mpsc::{self, Receiver, Sender}; | ||
use tokio::task::JoinSet; | ||
|
||
use crate::datasource::physical_plan::parquet::ParquetExecBuilder; | ||
use crate::datasource::physical_plan::parquet::{ | ||
ParquetExecBuilder, StatisticsConverter, | ||
}; | ||
use futures::{StreamExt, TryStreamExt}; | ||
use hashbrown::HashMap; | ||
use object_store::path::Path; | ||
|
@@ -295,86 +295,6 @@ impl FileFormat for ParquetFormat { | |
} | ||
} | ||
|
||
fn summarize_min_max( | ||
max_values: &mut [Option<MaxAccumulator>], | ||
min_values: &mut [Option<MinAccumulator>], | ||
fields: &Fields, | ||
i: usize, | ||
stat: &ParquetStatistics, | ||
) { | ||
if !stat.has_min_max_set() { | ||
max_values[i] = None; | ||
min_values[i] = None; | ||
return; | ||
} | ||
match stat { | ||
ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => { | ||
if let Some(max_value) = &mut max_values[i] { | ||
max_value | ||
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))]) | ||
.unwrap_or_else(|_| max_values[i] = None); | ||
} | ||
if let Some(min_value) = &mut min_values[i] { | ||
min_value | ||
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))]) | ||
.unwrap_or_else(|_| min_values[i] = None); | ||
} | ||
} | ||
ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => { | ||
if let Some(max_value) = &mut max_values[i] { | ||
max_value | ||
.update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))]) | ||
.unwrap_or_else(|_| max_values[i] = None); | ||
} | ||
if let Some(min_value) = &mut min_values[i] { | ||
min_value | ||
.update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))]) | ||
.unwrap_or_else(|_| min_values[i] = None); | ||
} | ||
} | ||
ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => { | ||
if let Some(max_value) = &mut max_values[i] { | ||
max_value | ||
.update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, even though update_batch is called, it first creates a single row array |
||
.unwrap_or_else(|_| max_values[i] = None); | ||
} | ||
if let Some(min_value) = &mut min_values[i] { | ||
min_value | ||
.update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))]) | ||
.unwrap_or_else(|_| min_values[i] = None); | ||
} | ||
} | ||
ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => { | ||
if let Some(max_value) = &mut max_values[i] { | ||
max_value | ||
.update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))]) | ||
.unwrap_or_else(|_| max_values[i] = None); | ||
} | ||
if let Some(min_value) = &mut min_values[i] { | ||
min_value | ||
.update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))]) | ||
.unwrap_or_else(|_| min_values[i] = None); | ||
} | ||
} | ||
ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => { | ||
if let Some(max_value) = &mut max_values[i] { | ||
max_value | ||
.update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))]) | ||
.unwrap_or_else(|_| max_values[i] = None); | ||
} | ||
if let Some(min_value) = &mut min_values[i] { | ||
min_value | ||
.update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))]) | ||
.unwrap_or_else(|_| min_values[i] = None); | ||
} | ||
} | ||
_ => { | ||
max_values[i] = None; | ||
min_values[i] = None; | ||
} | ||
} | ||
} | ||
|
||
/// Fetches parquet metadata from ObjectStore for given object | ||
/// | ||
/// This component is a subject to **change** in near future and is exposed for low level integrations | ||
|
@@ -482,73 +402,139 @@ pub async fn statistics_from_parquet_meta( | |
file_metadata.key_value_metadata(), | ||
)?; | ||
|
||
let num_fields = table_schema.fields().len(); | ||
let fields = table_schema.fields(); | ||
|
||
let mut num_rows = 0; | ||
let mut total_byte_size = 0; | ||
let mut null_counts = vec![Precision::Exact(0); num_fields]; | ||
let mut has_statistics = false; | ||
|
||
for row_group_meta in metadata.row_groups() { | ||
num_rows += row_group_meta.num_rows(); | ||
total_byte_size += row_group_meta.total_byte_size(); | ||
} | ||
|
||
let schema_adapter = | ||
DefaultSchemaAdapterFactory::default().create(table_schema.clone()); | ||
|
||
let (mut max_values, mut min_values) = create_max_min_accs(&table_schema); | ||
// statistics for each of the table's columns (may be different from the | ||
// file schema) | ||
let mut column_statistics = vec![]; | ||
|
||
for (table_idx, field) in table_schema.fields().iter().enumerate() { | ||
let Some(file_idx) = schema_adapter.map_column_index(table_idx, &file_schema) | ||
else { | ||
// file columns not in table schema are treated as all null | ||
let null_count = Precision::Exact(num_rows as usize); | ||
let null_value = ScalarValue::try_from(field.data_type())?; | ||
let stats = ColumnStatistics::new_unknown() | ||
.with_null_count(null_count) | ||
.with_max_value(Precision::Exact(null_value.clone())) | ||
.with_min_value(Precision::Exact(null_value)); | ||
column_statistics.push(stats); | ||
continue; | ||
}; | ||
|
||
for row_group_meta in metadata.row_groups() { | ||
num_rows += row_group_meta.num_rows(); | ||
total_byte_size += row_group_meta.total_byte_size(); | ||
let file_field = file_schema.field(file_idx); | ||
let Some(converter) = StatisticsConverter::try_new( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this code now uses the well tested StatisticsConverter to extract statistics from the parquet file with the correct type of array in a single call |
||
file_field.name(), | ||
&file_schema, | ||
file_metadata.schema_descr(), | ||
) | ||
.ok() else { | ||
// problem extracting statistics, bail out | ||
column_statistics.push(ColumnStatistics::new_unknown()); | ||
continue; | ||
}; | ||
|
||
let mut column_stats: HashMap<usize, (u64, &ParquetStatistics)> = HashMap::new(); | ||
let null_counts = converter.row_group_null_counts(metadata.row_groups())?; | ||
let null_count = accumulate_null_counts(&null_counts); | ||
|
||
for (i, column) in row_group_meta.columns().iter().enumerate() { | ||
if let Some(stat) = column.statistics() { | ||
has_statistics = true; | ||
column_stats.insert(i, (stat.null_count(), stat)); | ||
} | ||
} | ||
let maxes = converter.row_group_maxes(metadata.row_groups())?; | ||
let max_value = | ||
accumulate_column(MaxAccumulator::try_new(field.data_type()).ok(), maxes); | ||
|
||
if has_statistics { | ||
for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() { | ||
if let Some(file_idx) = | ||
schema_adapter.map_column_index(table_idx, &file_schema) | ||
{ | ||
if let Some((null_count, stats)) = column_stats.get(&file_idx) { | ||
*null_cnt = null_cnt.add(&Precision::Exact(*null_count as usize)); | ||
summarize_min_max( | ||
&mut max_values, | ||
&mut min_values, | ||
fields, | ||
table_idx, | ||
stats, | ||
) | ||
} else { | ||
// If none statistics of current column exists, set the Max/Min Accumulator to None. | ||
max_values[table_idx] = None; | ||
min_values[table_idx] = None; | ||
} | ||
} else { | ||
*null_cnt = null_cnt.add(&Precision::Exact(num_rows as usize)); | ||
} | ||
} | ||
} | ||
} | ||
let mins = converter.row_group_mins(metadata.row_groups())?; | ||
let min_value = | ||
accumulate_column(MinAccumulator::try_new(field.data_type()).ok(), mins); | ||
|
||
let column_stats = if has_statistics { | ||
get_col_stats(&table_schema, null_counts, &mut max_values, &mut min_values) | ||
} else { | ||
Statistics::unknown_column(&table_schema) | ||
}; | ||
column_statistics.push(ColumnStatistics { | ||
null_count, | ||
max_value, | ||
min_value, | ||
distinct_count: Precision::Absent, | ||
}); | ||
} | ||
|
||
let statistics = Statistics { | ||
num_rows: Precision::Exact(num_rows as usize), | ||
total_byte_size: Precision::Exact(total_byte_size as usize), | ||
column_statistics: column_stats, | ||
column_statistics, | ||
}; | ||
|
||
Ok(statistics) | ||
} | ||
|
||
/// Summarizes the UInt64Array to a single usize | ||
/// | ||
/// Nulls in `null_counts` are treated as missing values | ||
/// | ||
/// # Returns | ||
/// | ||
/// * Precision::Absent if there are any errors or there are no known statistics | ||
/// * Precision::Inexact if there are any null (unknown) null counts | ||
fn accumulate_null_counts(null_counts: &UInt64Array) -> Precision<usize> { | ||
let total_null_count: usize = null_counts | ||
.iter() | ||
.filter_map(|v| v.map(|v| v as usize)) | ||
.sum(); | ||
|
||
let num_unknown_null_counts = null_counts.null_count(); | ||
if num_unknown_null_counts == 0 { | ||
// had all null counts (common case) | ||
Precision::Exact(total_null_count) | ||
} else if num_unknown_null_counts == null_counts.len() { | ||
// didn't know any null counts | ||
Precision::Absent | ||
} else { | ||
// if any of the counts were null, don't know the true null count | ||
Precision::Inexact(total_null_count) | ||
} | ||
} | ||
|
||
/// Summarizes the column to a single value using the accumulator | ||
/// | ||
/// Nulls in `column` are treated as missing values (not actual null values in | ||
/// the parquet file) | ||
/// | ||
/// # Returns | ||
/// | ||
/// * Precision::Absent if there are any errors or there are no known statistics | ||
/// * Precision::Inexact if there are any nulls | ||
fn accumulate_column<A: Accumulator>( | ||
accumulator: Option<A>, | ||
column: ArrayRef, | ||
) -> Precision<ScalarValue> { | ||
// is_nullable returns false if array is guaranteed to not contains any | ||
// nulls. If there are nulls in the column, that means some of the row | ||
// group statistics were not known | ||
let nullable = column.is_nullable(); | ||
|
||
let scalar = accumulator.and_then(|mut acc| { | ||
acc.update_batch(&[column]).ok()?; | ||
acc.evaluate().ok() | ||
}); | ||
|
||
let Some(scalar) = scalar else { | ||
return Precision::Absent; | ||
}; | ||
|
||
// if the scalar itself is null, means we didn't have any known stats | ||
if scalar.is_null() { | ||
Precision::Absent | ||
} else if nullable { | ||
Precision::Inexact(scalar) | ||
} else { | ||
Precision::Exact(scalar) | ||
} | ||
} | ||
|
||
/// Implements [`DataSink`] for writing to a parquet file. | ||
pub struct ParquetSink { | ||
/// Config options for writing data | ||
|
@@ -1126,7 +1112,8 @@ mod tests { | |
use crate::physical_plan::metrics::MetricValue; | ||
use crate::prelude::{SessionConfig, SessionContext}; | ||
use arrow::array::{Array, ArrayRef, StringArray}; | ||
use arrow_schema::Field; | ||
use arrow_array::Int64Array; | ||
use arrow_schema::{DataType, Field}; | ||
use async_trait::async_trait; | ||
use datafusion_common::cast::{ | ||
as_binary_array, as_boolean_array, as_float32_array, as_float64_array, | ||
|
@@ -1449,8 +1436,15 @@ mod tests { | |
// column c1 | ||
let c1_stats = &stats.column_statistics[0]; | ||
assert_eq!(c1_stats.null_count, Precision::Exact(1)); | ||
assert_eq!(c1_stats.max_value, Precision::Absent); | ||
assert_eq!(c1_stats.min_value, Precision::Absent); | ||
// Note in ASCII lower case is greater than upper case | ||
assert_eq!( | ||
c1_stats.max_value, | ||
Precision::Exact(ScalarValue::from("bar")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. strings were previously not handled, but are now properly handled by |
||
); | ||
assert_eq!( | ||
c1_stats.min_value, | ||
Precision::Exact(ScalarValue::from("Foo")) | ||
); | ||
// column c2: missing from the file so the table treats all 3 rows as null | ||
let c2_stats = &stats.column_statistics[1]; | ||
assert_eq!(c2_stats.null_count, Precision::Exact(3)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These functions make creating
ColumnStatitistic
more ergonomic