From c138b24ab3e7c8c721657ad591676ba17c61d412 Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Thu, 7 Nov 2024 17:30:42 +0000 Subject: [PATCH 1/8] initial attempt at implementation --- .../physical_plan/file_scan_config.rs | 33 +---- .../core/src/datasource/physical_plan/mod.rs | 41 ++++++- .../physical-plan/src/execution_plan.rs | 7 ++ datafusion/physical-plan/src/lib.rs | 1 + .../src/sorts/sort_preserving_merge.rs | 44 +++++-- .../src}/statistics.rs | 113 ++++++++++-------- 6 files changed, 149 insertions(+), 90 deletions(-) rename datafusion/{core/src/datasource/physical_plan => physical-plan/src}/statistics.rs (74%) diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index fd14be08c7a3..591c8799b3d7 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, vec, }; -use super::{get_projected_output_ordering, statistics::MinMaxStatistics}; +use super::{get_projected_output_ordering, min_max_statistics_from_files}; use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{error::Result, scalar::ScalarValue}; @@ -310,22 +310,12 @@ impl FileScanConfig { sort_order: &LexOrdering, ) -> Result>> { let flattened_files = file_groups.iter().flatten().collect::>(); - // First Fit: - // * Choose the first file group that a file can be placed into. - // * If it fits into no existing file groups, create a new one. - // - // By sorting files by min values and then applying first-fit bin packing, - // we can produce the smallest number of file groups such that - // files within a group are in order and non-overlapping. - // - // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8 - // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html if flattened_files.is_empty() { return Ok(vec![]); } - let statistics = MinMaxStatistics::new_from_files( + let statistics = min_max_statistics_from_files( sort_order, table_schema, None, @@ -335,24 +325,7 @@ impl FileScanConfig { e.context("construct min/max statistics for split_groups_by_statistics") })?; - let indices_sorted_by_min = statistics.min_values_sorted(); - let mut file_groups_indices: Vec> = vec![]; - - for (idx, min) in indices_sorted_by_min { - let file_group_to_insert = file_groups_indices.iter_mut().find(|group| { - // If our file is non-overlapping and comes _after_ the last file, - // it fits in this file group. - min > statistics.max( - *group - .last() - .expect("groups should be nonempty at construction"), - ) - }); - match file_group_to_insert { - Some(group) => group.push(idx), - None => file_groups_indices.push(vec![idx]), - } - } + let file_groups_indices = statistics.first_fit(); // Assemble indices back into groups of PartitionedFiles Ok(file_groups_indices diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 2b50458bb581..5192b2c287c4 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -26,7 +26,6 @@ mod file_stream; mod json; #[cfg(feature = "parquet")] pub mod parquet; -mod statistics; pub(crate) use self::csv::plan_to_csv; pub(crate) use self::json::plan_to_json; @@ -36,7 +35,9 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactor pub use arrow_file::ArrowExec; pub use avro::AvroExec; pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener}; +use datafusion_common::{stats::Precision, ColumnStatistics, DataFusionError}; use datafusion_expr::dml::InsertOp; +use datafusion_physical_plan::statistics::MinMaxStatistics; pub use file_groups::FileGroupPartitioner; pub use file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, @@ -366,10 +367,10 @@ fn get_projected_output_ordering( return false; } - let statistics = match statistics::MinMaxStatistics::new_from_files( + let statistics = match min_max_statistics_from_files( &new_ordering, projected_schema, - base_config.projection.as_deref(), + base_config.projection.as_ref(), group, ) { Ok(statistics) => statistics, @@ -395,6 +396,40 @@ fn get_projected_output_ordering( all_orderings } +/// Construct MinMaxStatistics from a list of files +fn min_max_statistics_from_files<'a>( + projected_sort_order: &LexOrdering, // Sort order with respect to projected schema + projected_schema: &SchemaRef, // Projected schema + projection: Option<&Vec>, // Indices of projection in full table schema (None = all columns) + files: impl IntoIterator, +) -> Result { + let projected_statistics = files + .into_iter() + .map(|file| { + let mut statistics = file.statistics.clone()?; + for partition in &file.partition_values { + statistics.column_statistics.push(ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(partition.clone()), + min_value: Precision::Exact(partition.clone()), + distinct_count: Precision::Exact(1), + }); + } + + Some(statistics.project(projection)) + }) + .collect::>>() + .ok_or_else(|| { + DataFusionError::Plan("Parquet file missing statistics".to_string()) + })?; + + MinMaxStatistics::new_from_statistics( + projected_sort_order, + projected_schema, + &projected_statistics, + ) +} + /// Represents the possible outcomes of a range calculation. /// /// This enum is used to encapsulate the result of calculating the range of diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 7220e7594ea6..97412c760848 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -397,6 +397,13 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { Ok(Statistics::new_unknown(&self.schema())) } + fn statistics_by_partition(&self) -> Result> { + Ok(vec![ + self.statistics()?; + self.properties().partitioning.partition_count() + ]) + } + /// Returns `true` if a limit can be safely pushed down through this /// `ExecutionPlan` node. /// diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 845a74eaea48..84e6d06b4b33 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -72,6 +72,7 @@ pub mod recursive_query; pub mod repartition; pub mod sorts; pub mod spill; +pub mod statistics; pub mod stream; pub mod streaming; pub mod tree_node; diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 9a89db9a5893..42008dcc676d 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -24,6 +24,8 @@ use crate::common::spawn_buffered; use crate::limit::LimitStream; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::sorts::streaming_merge::StreamingMergeBuilder; +use crate::statistics::MinMaxStatistics; +use crate::stream::RecordBatchStreamAdapter; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, @@ -34,6 +36,7 @@ use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use futures::StreamExt; use log::{debug, trace}; /// Sort preserving merge execution plan @@ -249,16 +252,42 @@ impl ExecutionPlan for SortPreservingMergeExec { MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]")) .register(&context.runtime_env().memory_pool); - match input_partitions { + let statistics = MinMaxStatistics::new_from_statistics( + &self.expr, + &self.schema(), + &self.input.statistics_by_partition()?, + )?; + + // Organize the input partitions into chains, + // where elements of each chain are input partitions that are + // non-overlapping, and each chain is ordered by their min/max statistics. + // Then concatenate each chain into a single stream. + let mut streams = statistics + .first_fit() + .into_iter() + .map(|chain| { + let streams = chain + .into_iter() + .map(|i| self.input.execute(i, Arc::clone(&context))) + .collect::>>()?; + + // Concatenate the chain into a single stream + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.input.schema(), + futures::stream::iter(streams).flatten(), + )) as SendableRecordBatchStream) + }) + .collect::>>()?; + + match streams.len() { 0 => internal_err!( "SortPreservingMergeExec requires at least one input partition" ), 1 => match self.fetch { Some(fetch) => { - let stream = self.input.execute(0, context)?; debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input with {fetch}"); Ok(Box::pin(LimitStream::new( - stream, + streams.remove(0), 0, Some(fetch), BaselineMetrics::new(&self.metrics, partition), @@ -271,12 +300,9 @@ impl ExecutionPlan for SortPreservingMergeExec { } }, _ => { - let receivers = (0..input_partitions) - .map(|partition| { - let stream = - self.input.execute(partition, Arc::clone(&context))?; - Ok(spawn_buffered(stream, 1)) - }) + let receivers = streams + .into_iter() + .map(|stream| Ok(spawn_buffered(stream, 1))) .collect::>()?; debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute"); diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs b/datafusion/physical-plan/src/statistics.rs similarity index 74% rename from datafusion/core/src/datasource/physical_plan/statistics.rs rename to datafusion/physical-plan/src/statistics.rs index 488098e7861c..6ebcbe7c7c38 100644 --- a/datafusion/core/src/datasource/physical_plan/statistics.rs +++ b/datafusion/physical-plan/src/statistics.rs @@ -26,22 +26,23 @@ use std::sync::Arc; -use crate::datasource::listing::PartitionedFile; - use arrow::{ compute::SortColumn, row::{Row, Rows}, }; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result, Statistics}; use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison. +/// A normalized representation of min/max statistics that allows for efficient sorting & comparison. /// The min/max values are ordered by [`Self::sort_order`]. /// Furthermore, any columns that are reversed in the sort order have their min/max values swapped. -pub(crate) struct MinMaxStatistics { +/// +/// This can be used for optimizations involving reordering files and partitions in physical plans +/// when their data is non-overlapping and ordered. +pub struct MinMaxStatistics { min_by_sort_order: Rows, max_by_sort_order: Rows, sort_order: LexOrdering, @@ -65,64 +66,47 @@ impl MinMaxStatistics { self.max_by_sort_order.row(idx) } - pub fn new_from_files<'a>( - projected_sort_order: &LexOrdering, // Sort order with respect to projected schema - projected_schema: &SchemaRef, // Projected schema - projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns) - files: impl IntoIterator, + pub fn new_from_statistics<'a>( + sort_order: &LexOrdering, + schema: &SchemaRef, + statistics: impl IntoIterator, ) -> Result { use datafusion_common::ScalarValue; - let statistics_and_partition_values = files - .into_iter() - .map(|file| { - file.statistics - .as_ref() - .zip(Some(file.partition_values.as_slice())) - }) - .collect::>>() - .ok_or_else(|| { - DataFusionError::Plan("Parquet file missing statistics".to_string()) - })?; + let statistics = statistics.into_iter().collect::>(); // Helper function to get min/max statistics for a given column of projected_schema let get_min_max = |i: usize| -> Result<(Vec, Vec)> { - Ok(statistics_and_partition_values + Ok(statistics .iter() - .map(|(s, pv)| { - if i < s.column_statistics.len() { - s.column_statistics[i] - .min_value - .get_value() - .cloned() - .zip(s.column_statistics[i].max_value.get_value().cloned()) - .ok_or_else(|| { - DataFusionError::Plan("statistics not found".to_string()) - }) - } else { - let partition_value = &pv[i - s.column_statistics.len()]; - Ok((partition_value.clone(), partition_value.clone())) - } + .map(|s| { + s.column_statistics[i] + .min_value + .get_value() + .cloned() + .zip(s.column_statistics[i].max_value.get_value().cloned()) + .ok_or_else(|| { + DataFusionError::Plan("statistics not found".to_string()) + }) }) .collect::>>()? .into_iter() .unzip()) }; - let sort_columns = sort_columns_from_physical_sort_exprs(projected_sort_order) - .ok_or(DataFusionError::Plan( - "sort expression must be on column".to_string(), - ))?; + let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or( + DataFusionError::Plan("sort expression must be on column".to_string()), + )?; // Project the schema & sort order down to just the relevant columns let min_max_schema = Arc::new( - projected_schema + schema .project(&(sort_columns.iter().map(|c| c.index()).collect::>()))?, ); let min_max_sort_order = LexOrdering { inner: sort_columns .iter() - .zip(projected_sort_order.iter()) + .zip(sort_order.iter()) .enumerate() .map(|(i, (col, sort))| PhysicalSortExpr { expr: Arc::new(Column::new(col.name(), i)), @@ -134,12 +118,7 @@ impl MinMaxStatistics { let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns .iter() .map(|c| { - // Reverse the projection to get the index of the column in the full statistics - // The file statistics contains _every_ column , but the sort column's index() - // refers to the index in projected_schema - let i = projection.map(|p| p[c.index()]).unwrap_or(c.index()); - - let (min, max) = get_min_max(i).map_err(|e| { + let (min, max) = get_min_max(c.index()).map_err(|e| { e.context(format!("get min/max for column: '{}'", c.name())) })?; Ok(( @@ -277,6 +256,44 @@ impl MinMaxStatistics { .zip(self.min_by_sort_order.iter().skip(1)) .all(|(max, next_min)| max < next_min) } + + /// Computes a bin-packing of the min/max rows in these statistics + /// into chains, such that elements in a chain are non-overlapping and ordered + /// amongst one another. + /// This bin-packing is optimal in the sense that it has the fewest number of chains. + pub fn first_fit(&self) -> Vec> { + // First Fit: + // * Choose the first chain that an element can be placed into. + // * If it fits into no existing chain, create a new one. + // + // By sorting elements by min values and then applying first-fit bin packing, + // we can produce the smallest number of chains such that + // elements within a chain are in order and non-overlapping. + // + // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8 + // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html + + let elements_sorted_by_min = self.min_values_sorted(); + let mut chains: Vec> = vec![]; + + for (idx, min) in elements_sorted_by_min { + let chain_to_insert = chains.iter_mut().find(|chain| { + // If our element is non-overlapping and comes _after_ the last element of the chain, + // it can be added to this chain. + min > self.max( + *chain + .last() + .expect("groups should be nonempty at construction"), + ) + }); + match chain_to_insert { + Some(chain) => chain.push(idx), + None => chains.push(vec![idx]), // make a new chain + } + } + + chains + } } fn sort_columns_from_physical_sort_exprs( From b469b71ec0481bd36e684e4949165517f287a804 Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Thu, 7 Nov 2024 17:58:35 +0000 Subject: [PATCH 2/8] fall back to full merge on errors --- .../src/sorts/sort_preserving_merge.rs | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 42008dcc676d..08540ed9db7d 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -252,18 +252,23 @@ impl ExecutionPlan for SortPreservingMergeExec { MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]")) .register(&context.runtime_env().memory_pool); - let statistics = MinMaxStatistics::new_from_statistics( + // Organize the input partitions into chains, + // where elements of each chain are input partitions that are + // non-overlapping, and each chain is ordered by their min/max statistics. + let stream_packing = match MinMaxStatistics::new_from_statistics( &self.expr, &self.schema(), &self.input.statistics_by_partition()?, - )?; + ) { + Ok(statistics) => statistics.first_fit(), + Err(e) => { + log::debug!("error analyzing statistics for plan: {e}\nfalling back to full sort-merge"); + (0..input_partitions).map(|i| vec![i]).collect() + } + }; - // Organize the input partitions into chains, - // where elements of each chain are input partitions that are - // non-overlapping, and each chain is ordered by their min/max statistics. - // Then concatenate each chain into a single stream. - let mut streams = statistics - .first_fit() + // Concatenate each chain into a single stream. + let mut streams = stream_packing .into_iter() .map(|chain| { let streams = chain @@ -271,7 +276,6 @@ impl ExecutionPlan for SortPreservingMergeExec { .map(|i| self.input.execute(i, Arc::clone(&context))) .collect::>>()?; - // Concatenate the chain into a single stream Ok(Box::pin(RecordBatchStreamAdapter::new( self.input.schema(), futures::stream::iter(streams).flatten(), From 31d371695478839b3112b788ec1966142a6fc0ff Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Thu, 7 Nov 2024 18:11:05 +0000 Subject: [PATCH 3/8] make MinMaxStatistics only care about sorting column statistics --- datafusion/physical-plan/src/statistics.rs | 66 ++++++++++++---------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/datafusion/physical-plan/src/statistics.rs b/datafusion/physical-plan/src/statistics.rs index 6ebcbe7c7c38..13a2d35516a4 100644 --- a/datafusion/physical-plan/src/statistics.rs +++ b/datafusion/physical-plan/src/statistics.rs @@ -73,11 +73,37 @@ impl MinMaxStatistics { ) -> Result { use datafusion_common::ScalarValue; - let statistics = statistics.into_iter().collect::>(); + let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or( + DataFusionError::Plan("sort expression must be on column".to_string()), + )?; + + let projection = sort_columns.iter().map(|c| c.index()).collect::>(); + + // Project the schema & sort order down to just the relevant columns + + let projected_schema = Arc::new(schema.project(&projection)?); + + let projected_sort_order = LexOrdering { + inner: sort_columns + .iter() + .zip(sort_order.iter()) + .enumerate() + .map(|(i, (col, sort))| PhysicalSortExpr { + expr: Arc::new(Column::new(col.name(), i)), + options: sort.options, + }) + .collect::>(), + }; - // Helper function to get min/max statistics for a given column of projected_schema + let projected_statistics = statistics + .into_iter() + .cloned() + .map(|s| s.project(Some(&projection))) + .collect::>(); + + // Helper function to get min/max statistics let get_min_max = |i: usize| -> Result<(Vec, Vec)> { - Ok(statistics + Ok(projected_statistics .iter() .map(|s| { s.column_statistics[i] @@ -94,31 +120,11 @@ impl MinMaxStatistics { .unzip()) }; - let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or( - DataFusionError::Plan("sort expression must be on column".to_string()), - )?; - - // Project the schema & sort order down to just the relevant columns - let min_max_schema = Arc::new( - schema - .project(&(sort_columns.iter().map(|c| c.index()).collect::>()))?, - ); - let min_max_sort_order = LexOrdering { - inner: sort_columns - .iter() - .zip(sort_order.iter()) - .enumerate() - .map(|(i, (col, sort))| PhysicalSortExpr { - expr: Arc::new(Column::new(col.name(), i)), - options: sort.options, - }) - .collect::>(), - }; - let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns .iter() - .map(|c| { - let (min, max) = get_min_max(c.index()).map_err(|e| { + .enumerate() + .map(|(i, c)| { + let (min, max) = get_min_max(i).map_err(|e| { e.context(format!("get min/max for column: '{}'", c.name())) })?; Ok(( @@ -132,14 +138,14 @@ impl MinMaxStatistics { .unzip(); Self::new( - &min_max_sort_order, - &min_max_schema, - RecordBatch::try_new(Arc::clone(&min_max_schema), min_values).map_err( + &projected_sort_order, + &projected_schema, + RecordBatch::try_new(Arc::clone(&projected_schema), min_values).map_err( |e| { DataFusionError::ArrowError(e, Some("\ncreate min batch".to_string())) }, )?, - RecordBatch::try_new(Arc::clone(&min_max_schema), max_values).map_err( + RecordBatch::try_new(Arc::clone(&projected_schema), max_values).map_err( |e| { DataFusionError::ArrowError(e, Some("\ncreate max batch".to_string())) }, From 2f0e74f155887a919ec0b6d95f1bdaa810455814 Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Mon, 11 Nov 2024 21:57:56 +0000 Subject: [PATCH 4/8] fixes + statistics merging --- datafusion/common/src/stats.rs | 31 +++++++ .../physical_plan/file_scan_config.rs | 25 ++++++ .../datasource/physical_plan/parquet/mod.rs | 13 +++ .../src/sorts/sort_preserving_merge.rs | 77 +++++++++++----- datafusion/physical-plan/src/union.rs | 11 +++ .../optimize_sort_preserving_merge.slt | 89 +++++++++++++++++++ 6 files changed, 225 insertions(+), 21 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/optimize_sort_preserving_merge.slt diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index d2ce965c5c49..aebad2688ba4 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -363,6 +363,25 @@ impl Statistics { self.total_byte_size = Precision::Absent; Ok(self) } + + /// Attempt to merge these statistics. + /// Merging provides conservative estimates for null & distinct counts as well as num_rows, + /// so those will be converted to inexact values. + pub fn merge(&self, other: &Self) -> Self { + Self { + num_rows: self.num_rows.add(&other.num_rows).to_inexact(), + total_byte_size: self + .total_byte_size + .add(&other.total_byte_size) + .to_inexact(), + column_statistics: self + .column_statistics + .iter() + .zip(&other.column_statistics) + .map(|(a, b)| a.merge(b)) + .collect::>(), + } + } } /// Creates an estimate of the number of rows in the output using the given @@ -472,6 +491,18 @@ impl ColumnStatistics { self.distinct_count = self.distinct_count.to_inexact(); self } + + /// Attempt to merge these statistics. + /// Merging provides conservative estimates for null & distinct counts, + /// so those will be converted to inexact values. + pub fn merge(&self, other: &Self) -> Self { + Self { + null_count: self.null_count.add(&other.null_count).to_inexact(), + max_value: self.max_value.max(&other.max_value), + min_value: self.min_value.min(&other.min_value), + distinct_count: self.distinct_count.add(&other.distinct_count).to_inexact(), + } + } } #[cfg(test)] diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 591c8799b3d7..3da27d413786 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -260,6 +260,31 @@ impl FileScanConfig { (projected_schema, table_stats, projected_output_ordering) } + pub(crate) fn projected_statistics_by_file_group(&self) -> Vec { + self.file_groups + .iter() + .map(|group| { + let mut stats = group + .iter() + .map(|f| { + f.statistics + .clone() + .unwrap_or(Statistics::new_unknown(&self.file_schema)) + }) + .reduce(|old, new| old.merge(&new)) + .unwrap_or(Statistics::new_unknown(&self.file_schema)); + + for _ in 0..self.table_partition_cols.len() { + stats + .column_statistics + .push(ColumnStatistics::new_unknown()) + } + + stats.project(self.projection.as_ref()) + }) + .collect() + } + #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro pub(crate) fn projected_file_column_names(&self) -> Option> { self.projection.as_ref().map(|p| { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 980f796a53b2..719bbf4e372d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -264,6 +264,7 @@ pub struct ParquetExec { /// Base configuration for this scan base_config: FileScanConfig, projected_statistics: Statistics, + projected_statistics_by_file_group: Vec, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Optional predicate for row filtering during parquet scan @@ -446,6 +447,9 @@ impl ParquetExecBuilder { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); + let projected_statistics_by_file_group = + base_config.projected_statistics_by_file_group(); + let cache = ParquetExec::compute_properties( projected_schema, &projected_output_ordering, @@ -454,6 +458,7 @@ impl ParquetExecBuilder { ParquetExec { base_config, projected_statistics, + projected_statistics_by_file_group, metrics, predicate, pruning_predicate, @@ -506,6 +511,7 @@ impl ParquetExec { let Self { base_config, projected_statistics: _, + projected_statistics_by_file_group: _, metrics: _, predicate, pruning_predicate: _, @@ -843,6 +849,10 @@ impl ExecutionPlan for ParquetExec { Ok(stats) } + fn statistics_by_partition(&self) -> Result> { + Ok(self.projected_statistics_by_file_group.clone()) + } + fn fetch(&self) -> Option { self.base_config.limit } @@ -853,6 +863,9 @@ impl ExecutionPlan for ParquetExec { Some(Arc::new(Self { base_config: new_config, projected_statistics: self.projected_statistics.clone(), + projected_statistics_by_file_group: self + .projected_statistics_by_file_group + .clone(), metrics: self.metrics.clone(), predicate: self.predicate.clone(), pruning_predicate: self.pruning_predicate.clone(), diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 08540ed9db7d..37c263a133aa 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -37,6 +37,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use futures::StreamExt; +use itertools::Itertools; use log::{debug, trace}; /// Sort preserving merge execution plan @@ -85,12 +86,35 @@ pub struct SortPreservingMergeExec { cache: PlanProperties, /// Configuration parameter to enable round-robin selection of tied winners of loser tree. enable_round_robin_repartition: bool, + /// Grouping of partitions, such that partitions in a group will be executed sequentially. + /// If None, then every partition gets its own group. + partition_groups: Option>>, } impl SortPreservingMergeExec { /// Create a new sort execution plan pub fn new(expr: LexOrdering, input: Arc) -> Self { let cache = Self::compute_properties(&input, expr.clone()); + + // Organize the input partitions into chains, + // where elements of each chain are input partitions that are + // non-overlapping, and each chain is ordered internally by their min/max statistics. + let partition_groups = input + .statistics_by_partition() + .and_then(|stats| { + MinMaxStatistics::new_from_statistics(&expr, &input.schema(), &stats) + }) + .map(|min_max_stats| min_max_stats.first_fit()) + .inspect_err(|e| { + log::debug!( + "error analyzing statistics: {e}\n falling back to full sort-merge" + ) + }) + .ok() + .filter(|groups| { + groups.len() < input.properties().partitioning.partition_count() + }); + Self { input, expr, @@ -98,6 +122,7 @@ impl SortPreservingMergeExec { fetch: None, cache, enable_round_robin_repartition: true, + partition_groups, } } @@ -160,6 +185,20 @@ impl DisplayAs for SortPreservingMergeExec { write!(f, ", fetch={fetch}")?; }; + if let Some(ref partition_groups) = self.partition_groups { + write!( + f, + ", partition_groups=[{}]", + partition_groups + .iter() + .map(|group| group + .iter() + .map(|index| index.to_string()) + .join(",")) + .join(",") + )?; + } + Ok(()) } } @@ -193,6 +232,7 @@ impl ExecutionPlan for SortPreservingMergeExec { fetch: limit, cache: self.cache.clone(), enable_round_robin_repartition: true, + partition_groups: self.partition_groups.clone(), })) } @@ -252,30 +292,26 @@ impl ExecutionPlan for SortPreservingMergeExec { MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]")) .register(&context.runtime_env().memory_pool); - // Organize the input partitions into chains, - // where elements of each chain are input partitions that are - // non-overlapping, and each chain is ordered by their min/max statistics. - let stream_packing = match MinMaxStatistics::new_from_statistics( - &self.expr, - &self.schema(), - &self.input.statistics_by_partition()?, - ) { - Ok(statistics) => statistics.first_fit(), - Err(e) => { - log::debug!("error analyzing statistics for plan: {e}\nfalling back to full sort-merge"); - (0..input_partitions).map(|i| vec![i]).collect() - } - }; + let partition_groups = self + .partition_groups + .clone() + .unwrap_or((0..input_partitions).map(|i| vec![i]).collect()); // Concatenate each chain into a single stream. - let mut streams = stream_packing + let mut output_partitions = partition_groups .into_iter() .map(|chain| { - let streams = chain + let mut streams = chain .into_iter() .map(|i| self.input.execute(i, Arc::clone(&context))) .collect::>>()?; + // If there's only 1 input partition in this group, + // no need to concatenate anything. + if streams.len() == 1 { + return Ok(streams.remove(0)); + } + Ok(Box::pin(RecordBatchStreamAdapter::new( self.input.schema(), futures::stream::iter(streams).flatten(), @@ -283,7 +319,7 @@ impl ExecutionPlan for SortPreservingMergeExec { }) .collect::>>()?; - match streams.len() { + match output_partitions.len() { 0 => internal_err!( "SortPreservingMergeExec requires at least one input partition" ), @@ -291,20 +327,19 @@ impl ExecutionPlan for SortPreservingMergeExec { Some(fetch) => { debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input with {fetch}"); Ok(Box::pin(LimitStream::new( - streams.remove(0), + output_partitions.remove(0), 0, Some(fetch), BaselineMetrics::new(&self.metrics, partition), ))) } None => { - let stream = self.input.execute(0, context); debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input without fetch"); - stream + Ok(output_partitions.remove(0)) } }, _ => { - let receivers = streams + let receivers = output_partitions .into_iter() .map(|stream| Ok(spawn_buffered(stream, 1))) .collect::>()?; diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index bd36753880eb..411049e4cdaf 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -257,6 +257,17 @@ impl ExecutionPlan for UnionExec { .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) } + fn statistics_by_partition(&self) -> Result> { + Ok(self + .inputs + .iter() + .map(|child| child.statistics_by_partition()) + .collect::>>()? + .into_iter() + .flatten() + .collect()) + } + fn benefits_from_input_partitioning(&self) -> Vec { vec![false; self.children().len()] } diff --git a/datafusion/sqllogictest/test_files/optimize_sort_preserving_merge.slt b/datafusion/sqllogictest/test_files/optimize_sort_preserving_merge.slt new file mode 100644 index 000000000000..bb2458356021 --- /dev/null +++ b/datafusion/sqllogictest/test_files/optimize_sort_preserving_merge.slt @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +## Optimize Sort Preserving Merge +########## + +# Collect statistics +statement ok +set datafusion.execution.collect_statistics = true; + +statement ok +set datafusion.optimizer.prefer_existing_sort = true; + +# Partition 1 has only columns a and b +statement ok +COPY ( + SELECT column1 as a, column2 as b + FROM ( VALUES ('foo', 1), ('qux', 3) ) + ) TO 'test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=1/1_1.parquet' +STORED AS PARQUET; + +# Add another file to this partition +statement ok +COPY ( + SELECT column1 as a, column2 as b + FROM ( VALUES ('foobar', 2), ('quux', 4) ) + ) TO 'test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=1/1_2.parquet' +STORED AS PARQUET; + +# Partition 2 has only a +statement ok +COPY ( + SELECT column1 as a + FROM ( VALUES ('bar'), ('baz') ) + ) TO 'test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=2/2_2.parquet' +STORED AS PARQUET; + +statement ok +CREATE EXTERNAL TABLE t(a varchar NOT NULL, b int, c float, partition int) STORED AS PARQUET +PARTITIONED BY (partition) +WITH ORDER (a) +LOCATION 'test_files/scratch/optimize_sort_preserving_merge/parquet_table/'; + +query TT +EXPLAIN +select a from t WHERE partition = 1 +UNION all +select a from t WHERE partition = 2 +ORDER BY a; +---- +logical_plan +01)Sort: t.a ASC NULLS LAST +02)--Union +03)----TableScan: t projection=[a], full_filters=[t.partition = Int32(1)] +04)----TableScan: t projection=[a], full_filters=[t.partition = Int32(2)] +physical_plan +01)SortPreservingMergeExec: [a@0 ASC NULLS LAST], partition_groups=[2,0,1] +02)--UnionExec +03)----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=1/1_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=1/1_2.parquet]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST] +04)----ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=2/2_2.parquet]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST] + + +query T +select a from t WHERE partition = 1 +UNION all +select a from t WHERE partition = 2 +ORDER BY a; +---- +bar +baz +foo +foobar +quux +qux From 2678db37a71d11696c64f59f6e70e4ed0725291b Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Mon, 11 Nov 2024 21:59:07 +0000 Subject: [PATCH 5/8] change language --- datafusion/common/src/stats.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index aebad2688ba4..85e6a234074d 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -365,7 +365,7 @@ impl Statistics { } /// Attempt to merge these statistics. - /// Merging provides conservative estimates for null & distinct counts as well as num_rows, + /// Merging provides conservatively large estimates for null & distinct counts as well as num_rows, /// so those will be converted to inexact values. pub fn merge(&self, other: &Self) -> Self { Self { @@ -493,7 +493,7 @@ impl ColumnStatistics { } /// Attempt to merge these statistics. - /// Merging provides conservative estimates for null & distinct counts, + /// Merging provides conservatively large estimates for null & distinct counts, /// so those will be converted to inexact values. pub fn merge(&self, other: &Self) -> Self { Self { From 6102047e4fd3e06891d79e31067da0bcd15fbb3b Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Mon, 11 Nov 2024 22:00:44 +0000 Subject: [PATCH 6/8] todo comment --- datafusion/core/src/datasource/physical_plan/file_scan_config.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 3da27d413786..bfa2acf1be7e 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -275,6 +275,7 @@ impl FileScanConfig { .unwrap_or(Statistics::new_unknown(&self.file_schema)); for _ in 0..self.table_partition_cols.len() { + // TODO provide accurate stat for partition column (#1186) stats .column_statistics .push(ColumnStatistics::new_unknown()) From 0662effa601e27777e6af87190d7fdf328cb960c Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Mon, 11 Nov 2024 22:12:14 +0000 Subject: [PATCH 7/8] fix display impl --- .../physical-plan/src/sorts/sort_preserving_merge.rs | 8 ++++---- .../test_files/optimize_sort_preserving_merge.slt | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 37c263a133aa..b9bf67e46a6a 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -191,10 +191,10 @@ impl DisplayAs for SortPreservingMergeExec { ", partition_groups=[{}]", partition_groups .iter() - .map(|group| group - .iter() - .map(|index| index.to_string()) - .join(",")) + .map(|group| format!( + "[{}]", + group.iter().map(|index| index.to_string()).join(",") + )) .join(",") )?; } diff --git a/datafusion/sqllogictest/test_files/optimize_sort_preserving_merge.slt b/datafusion/sqllogictest/test_files/optimize_sort_preserving_merge.slt index bb2458356021..19f6efe1458a 100644 --- a/datafusion/sqllogictest/test_files/optimize_sort_preserving_merge.slt +++ b/datafusion/sqllogictest/test_files/optimize_sort_preserving_merge.slt @@ -69,7 +69,7 @@ logical_plan 03)----TableScan: t projection=[a], full_filters=[t.partition = Int32(1)] 04)----TableScan: t projection=[a], full_filters=[t.partition = Int32(2)] physical_plan -01)SortPreservingMergeExec: [a@0 ASC NULLS LAST], partition_groups=[2,0,1] +01)SortPreservingMergeExec: [a@0 ASC NULLS LAST], partition_groups=[[2,0],[1]] 02)--UnionExec 03)----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=1/1_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=1/1_2.parquet]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST] 04)----ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=2/2_2.parquet]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST] From 1c93a0de182d5cb8ac8dbc6567975793cc818ac3 Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Mon, 11 Nov 2024 22:27:20 +0000 Subject: [PATCH 8/8] rename output_partitions to streams_to_merge because they're not output partitions --- .../physical-plan/src/sorts/sort_preserving_merge.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b9bf67e46a6a..ebaaf2f6c0ca 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -298,7 +298,7 @@ impl ExecutionPlan for SortPreservingMergeExec { .unwrap_or((0..input_partitions).map(|i| vec![i]).collect()); // Concatenate each chain into a single stream. - let mut output_partitions = partition_groups + let mut streams_to_merge = partition_groups .into_iter() .map(|chain| { let mut streams = chain @@ -319,7 +319,7 @@ impl ExecutionPlan for SortPreservingMergeExec { }) .collect::>>()?; - match output_partitions.len() { + match streams_to_merge.len() { 0 => internal_err!( "SortPreservingMergeExec requires at least one input partition" ), @@ -327,7 +327,7 @@ impl ExecutionPlan for SortPreservingMergeExec { Some(fetch) => { debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input with {fetch}"); Ok(Box::pin(LimitStream::new( - output_partitions.remove(0), + streams_to_merge.remove(0), 0, Some(fetch), BaselineMetrics::new(&self.metrics, partition), @@ -335,11 +335,11 @@ impl ExecutionPlan for SortPreservingMergeExec { } None => { debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input without fetch"); - Ok(output_partitions.remove(0)) + Ok(streams_to_merge.remove(0)) } }, _ => { - let receivers = output_partitions + let receivers = streams_to_merge .into_iter() .map(|stream| Ok(spawn_buffered(stream, 1))) .collect::>()?;