Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimize SortPreservingMergeExec to avoid merging non-overlapping partitions #13296

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,25 @@ impl Statistics {
self.total_byte_size = Precision::Absent;
Ok(self)
}

/// Attempt to merge these statistics.
/// Merging provides conservatively large estimates for null & distinct counts as well as num_rows,
/// so those will be converted to inexact values.
pub fn merge(&self, other: &Self) -> Self {
Self {
num_rows: self.num_rows.add(&other.num_rows).to_inexact(),
total_byte_size: self
.total_byte_size
.add(&other.total_byte_size)
.to_inexact(),
column_statistics: self
.column_statistics
.iter()
.zip(&other.column_statistics)
.map(|(a, b)| a.merge(b))
.collect::<Vec<_>>(),
}
}
}

/// Creates an estimate of the number of rows in the output using the given
Expand Down Expand Up @@ -472,6 +491,18 @@ impl ColumnStatistics {
self.distinct_count = self.distinct_count.to_inexact();
self
}

/// Attempt to merge these statistics.
/// Merging provides conservatively large estimates for null & distinct counts,
/// so those will be converted to inexact values.
pub fn merge(&self, other: &Self) -> Self {
Self {
null_count: self.null_count.add(&other.null_count).to_inexact(),
max_value: self.max_value.max(&other.max_value),
min_value: self.min_value.min(&other.min_value),
distinct_count: self.distinct_count.add(&other.distinct_count).to_inexact(),
}
}
}

#[cfg(test)]
Expand Down
59 changes: 29 additions & 30 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
sync::Arc, vec,
};

use super::{get_projected_output_ordering, statistics::MinMaxStatistics};
use super::{get_projected_output_ordering, min_max_statistics_from_files};
use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl};
use crate::{error::Result, scalar::ScalarValue};

Expand Down Expand Up @@ -260,6 +260,32 @@ impl FileScanConfig {
(projected_schema, table_stats, projected_output_ordering)
}

pub(crate) fn projected_statistics_by_file_group(&self) -> Vec<Statistics> {
self.file_groups
.iter()
.map(|group| {
let mut stats = group
.iter()
.map(|f| {
f.statistics
.clone()
.unwrap_or(Statistics::new_unknown(&self.file_schema))
})
.reduce(|old, new| old.merge(&new))
.unwrap_or(Statistics::new_unknown(&self.file_schema));

for _ in 0..self.table_partition_cols.len() {
// TODO provide accurate stat for partition column (#1186)
stats
.column_statistics
.push(ColumnStatistics::new_unknown())
}

stats.project(self.projection.as_ref())
})
.collect()
}

#[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
pub(crate) fn projected_file_column_names(&self) -> Option<Vec<String>> {
self.projection.as_ref().map(|p| {
Expand Down Expand Up @@ -310,22 +336,12 @@ impl FileScanConfig {
sort_order: &LexOrdering,
) -> Result<Vec<Vec<PartitionedFile>>> {
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
// First Fit:
// * Choose the first file group that a file can be placed into.
// * If it fits into no existing file groups, create a new one.
//
// By sorting files by min values and then applying first-fit bin packing,
// we can produce the smallest number of file groups such that
// files within a group are in order and non-overlapping.
//
// Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
// https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
Comment on lines -313 to -322
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this and the relevant code into a new method, MinMaxStatistics::first_fit


if flattened_files.is_empty() {
return Ok(vec![]);
}

let statistics = MinMaxStatistics::new_from_files(
let statistics = min_max_statistics_from_files(
sort_order,
table_schema,
None,
Expand All @@ -335,24 +351,7 @@ impl FileScanConfig {
e.context("construct min/max statistics for split_groups_by_statistics")
})?;

let indices_sorted_by_min = statistics.min_values_sorted();
let mut file_groups_indices: Vec<Vec<usize>> = vec![];

for (idx, min) in indices_sorted_by_min {
let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
// If our file is non-overlapping and comes _after_ the last file,
// it fits in this file group.
min > statistics.max(
*group
.last()
.expect("groups should be nonempty at construction"),
)
});
match file_group_to_insert {
Some(group) => group.push(idx),
None => file_groups_indices.push(vec![idx]),
}
}
let file_groups_indices = statistics.first_fit();

// Assemble indices back into groups of PartitionedFiles
Ok(file_groups_indices
Expand Down
41 changes: 38 additions & 3 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ mod file_stream;
mod json;
#[cfg(feature = "parquet")]
pub mod parquet;
mod statistics;

pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
Expand All @@ -36,7 +35,9 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactor
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener};
use datafusion_common::{stats::Precision, ColumnStatistics, DataFusionError};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_plan::statistics::MinMaxStatistics;
pub use file_groups::FileGroupPartitioner;
pub use file_scan_config::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
Expand Down Expand Up @@ -366,10 +367,10 @@ fn get_projected_output_ordering(
return false;
}

let statistics = match statistics::MinMaxStatistics::new_from_files(
let statistics = match min_max_statistics_from_files(
&new_ordering,
projected_schema,
base_config.projection.as_deref(),
base_config.projection.as_ref(),
group,
) {
Ok(statistics) => statistics,
Expand All @@ -395,6 +396,40 @@ fn get_projected_output_ordering(
all_orderings
}

/// Construct MinMaxStatistics from a list of files
fn min_max_statistics_from_files<'a>(
projected_sort_order: &LexOrdering, // Sort order with respect to projected schema
projected_schema: &SchemaRef, // Projected schema
projection: Option<&Vec<usize>>, // Indices of projection in full table schema (None = all columns)
files: impl IntoIterator<Item = &'a PartitionedFile>,
) -> Result<MinMaxStatistics> {
let projected_statistics = files
.into_iter()
.map(|file| {
let mut statistics = file.statistics.clone()?;
for partition in &file.partition_values {
statistics.column_statistics.push(ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(partition.clone()),
min_value: Precision::Exact(partition.clone()),
distinct_count: Precision::Exact(1),
});
}

Some(statistics.project(projection))
})
.collect::<Option<Vec<_>>>()
.ok_or_else(|| {
DataFusionError::Plan("Parquet file missing statistics".to_string())
})?;

MinMaxStatistics::new_from_statistics(
projected_sort_order,
projected_schema,
&projected_statistics,
)
}

/// Represents the possible outcomes of a range calculation.
///
/// This enum is used to encapsulate the result of calculating the range of
Expand Down
13 changes: 13 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ pub struct ParquetExec {
/// Base configuration for this scan
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_statistics_by_file_group: Vec<Statistics>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
Expand Down Expand Up @@ -446,6 +447,9 @@ impl ParquetExecBuilder {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();

let projected_statistics_by_file_group =
base_config.projected_statistics_by_file_group();

let cache = ParquetExec::compute_properties(
projected_schema,
&projected_output_ordering,
Expand All @@ -454,6 +458,7 @@ impl ParquetExecBuilder {
ParquetExec {
base_config,
projected_statistics,
projected_statistics_by_file_group,
metrics,
predicate,
pruning_predicate,
Expand Down Expand Up @@ -506,6 +511,7 @@ impl ParquetExec {
let Self {
base_config,
projected_statistics: _,
projected_statistics_by_file_group: _,
metrics: _,
predicate,
pruning_predicate: _,
Expand Down Expand Up @@ -843,6 +849,10 @@ impl ExecutionPlan for ParquetExec {
Ok(stats)
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Ok(self.projected_statistics_by_file_group.clone())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}
Expand All @@ -853,6 +863,9 @@ impl ExecutionPlan for ParquetExec {
Some(Arc::new(Self {
base_config: new_config,
projected_statistics: self.projected_statistics.clone(),
projected_statistics_by_file_group: self
.projected_statistics_by_file_group
.clone(),
metrics: self.metrics.clone(),
predicate: self.predicate.clone(),
pruning_predicate: self.pruning_predicate.clone(),
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
Ok(Statistics::new_unknown(&self.schema()))
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Ok(vec![
self.statistics()?;
self.properties().partitioning.partition_count()
])
}

Comment on lines +400 to +406
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated in the PR description, this is what a proposed API would look like for statistics by partition, though it is certainly not final.

/// Returns `true` if a limit can be safely pushed down through this
/// `ExecutionPlan` node.
///
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub mod recursive_query;
pub mod repartition;
pub mod sorts;
pub mod spill;
pub mod statistics;
pub mod stream;
pub mod streaming;
pub mod tree_node;
Expand Down
Loading
Loading