From cc01c2330bab1a8abb92a5a54a7d204a6cac5f17 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Wed, 3 Jul 2024 13:06:14 +0300 Subject: [PATCH] Add Optimizer Sanity Checker, improve sortedness equivalence properties (#11196) * Initial optimizer sanity checker. Only includes sort reqs, docs will be added. * Add distro and pipeline friendly checks * Also check the plans we create are correct. * Add distribution test cases using global limit exec. * Add test for multiple children using SortMergeJoinExec. * Move PipelineChecker to SanityCheckPlan * Fix some tests and add docs * Add some test docs and fix clippy diagnostics. * Fix some failing tests * Replace PipelineChecker with SanityChecker in .slt files. * Initial commit * Slt tests pass * Resolve linter errors * Minor changes * Minor changes * Minor changes * Minor changes * Sort PreservingMerge clear per partition * Minor changes * Update output_requirements.rs * Address reviews * Update datafusion/core/src/physical_optimizer/optimizer.rs Co-authored-by: Mehmet Ozan Kabak * Update datafusion/core/src/physical_optimizer/sanity_checker.rs Co-authored-by: Mehmet Ozan Kabak * Address reviews * Minor changes * Apply suggestions from code review Co-authored-by: Andrew Lamb * Update comment * Add map implementation --------- Co-authored-by: Erman Yafay Co-authored-by: berkaysynnada Co-authored-by: Mehmet Ozan Kabak Co-authored-by: Andrew Lamb --- datafusion/core/src/physical_optimizer/mod.rs | 2 +- .../core/src/physical_optimizer/optimizer.rs | 16 +- .../physical_optimizer/output_requirements.rs | 4 +- .../physical_optimizer/pipeline_checker.rs | 334 --------- .../src/physical_optimizer/sanity_checker.rs | 666 ++++++++++++++++++ .../src/physical_optimizer/sort_pushdown.rs | 31 +- .../sort_preserving_repartition_fuzz.rs | 6 +- datafusion/core/tests/sql/joins.rs | 2 +- .../physical-expr/src/equivalence/class.rs | 67 ++ .../physical-expr/src/equivalence/mod.rs | 6 +- .../physical-expr/src/equivalence/ordering.rs | 6 +- .../src/equivalence/properties.rs | 115 ++- datafusion/physical-expr/src/lib.rs | 2 +- .../physical-plan/src/coalesce_partitions.rs | 2 +- datafusion/physical-plan/src/filter.rs | 20 +- .../physical-plan/src/repartition/mod.rs | 5 + .../src/sorts/sort_preserving_merge.rs | 16 +- datafusion/physical-plan/src/union.rs | 96 ++- datafusion/physical-plan/src/windows/mod.rs | 8 +- .../sqllogictest/test_files/aggregate.slt | 2 +- .../sqllogictest/test_files/explain.slt | 6 +- datafusion/sqllogictest/test_files/joins.slt | 50 +- datafusion/sqllogictest/test_files/window.slt | 6 +- 23 files changed, 1009 insertions(+), 459 deletions(-) delete mode 100644 datafusion/core/src/physical_optimizer/pipeline_checker.rs create mode 100644 datafusion/core/src/physical_optimizer/sanity_checker.rs diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index e1bde36bd6fed..9ad05bf496e59 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -30,10 +30,10 @@ pub mod join_selection; pub mod limited_distinct_aggregation; pub mod optimizer; pub mod output_requirements; -pub mod pipeline_checker; pub mod projection_pushdown; pub mod pruning; pub mod replace_with_order_preserving_variants; +pub mod sanity_checker; mod sort_pushdown; pub mod topk_aggregation; pub mod update_aggr_exprs; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 6880a5433943e..2d9744ad23dd3 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -30,7 +30,7 @@ use crate::physical_optimizer::enforce_sorting::EnforceSorting; use crate::physical_optimizer::join_selection::JoinSelection; use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation; use crate::physical_optimizer::output_requirements::OutputRequirements; -use crate::physical_optimizer::pipeline_checker::PipelineChecker; +use crate::physical_optimizer::sanity_checker::SanityCheckPlan; use crate::physical_optimizer::topk_aggregation::TopKAggregation; use crate::{error::Result, physical_plan::ExecutionPlan}; @@ -124,11 +124,15 @@ impl PhysicalOptimizer { // are not present, the load of executors such as join or union will be // reduced by narrowing their input tables. Arc::new(ProjectionPushdown::new()), - // The PipelineChecker rule will reject non-runnable query plans that use - // pipeline-breaking operators on infinite input(s). The rule generates a - // diagnostic error message when this happens. It makes no changes to the - // given query plan; i.e. it only acts as a final gatekeeping rule. - Arc::new(PipelineChecker::new()), + // The SanityCheckPlan rule checks whether the order and + // distribution requirements of each node in the plan + // is satisfied. It will also reject non-runnable query + // plans that use pipeline-breaking operators on infinite + // input(s). The rule generates a diagnostic error + // message for invalid plans. It makes no changes to the + // given query plan; i.e. it only acts as a final + // gatekeeping rule. + Arc::new(SanityCheckPlan::new()), ]; Self::with_rules(rules) diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index 67b38dba90ca0..671bb437d5fa2 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -248,7 +248,9 @@ fn require_top_ordering_helper( if children.len() != 1 { Ok((plan, false)) } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { - let req_ordering = sort_exec.properties().output_ordering().unwrap_or(&[]); + // In case of constant columns, output ordering of SortExec would give an empty set. + // Therefore; we check the sort expression field of the SortExec to assign the requirements. + let req_ordering = sort_exec.expr(); let req_dist = sort_exec.required_input_distribution()[0].clone(); let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); Ok(( diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs deleted file mode 100644 index 5c6a0ab8ea7fa..0000000000000 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ /dev/null @@ -1,334 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -//http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! The [PipelineChecker] rule ensures that a given plan can accommodate its -//! infinite sources, if there are any. It will reject non-runnable query plans -//! that use pipeline-breaking operators on infinite input(s). - -use std::sync::Arc; - -use crate::config::ConfigOptions; -use crate::error::Result; -use crate::physical_optimizer::PhysicalOptimizerRule; -use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; - -use datafusion_common::config::OptimizerOptions; -use datafusion_common::plan_err; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; -use datafusion_physical_plan::joins::SymmetricHashJoinExec; - -/// The PipelineChecker rule rejects non-runnable query plans that use -/// pipeline-breaking operators on infinite input(s). -#[derive(Default)] -pub struct PipelineChecker {} - -impl PipelineChecker { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} - -impl PhysicalOptimizerRule for PipelineChecker { - fn optimize( - &self, - plan: Arc, - config: &ConfigOptions, - ) -> Result> { - plan.transform_up(|p| check_finiteness_requirements(p, &config.optimizer)) - .data() - } - - fn name(&self) -> &str { - "PipelineChecker" - } - - fn schema_check(&self) -> bool { - true - } -} - -/// This function propagates finiteness information and rejects any plan with -/// pipeline-breaking operators acting on infinite inputs. -pub fn check_finiteness_requirements( - input: Arc, - optimizer_options: &OptimizerOptions, -) -> Result>> { - if let Some(exec) = input.as_any().downcast_ref::() { - if !(optimizer_options.allow_symmetric_joins_without_pruning - || (exec.check_if_order_information_available()? && is_prunable(exec))) - { - return plan_err!("Join operation cannot operate on a non-prunable stream without enabling \ - the 'allow_symmetric_joins_without_pruning' configuration flag"); - } - } - if !input.execution_mode().pipeline_friendly() { - plan_err!( - "Cannot execute pipeline breaking queries, operator: {:?}", - input - ) - } else { - Ok(Transformed::no(input)) - } -} - -/// This function returns whether a given symmetric hash join is amenable to -/// data pruning. For this to be possible, it needs to have a filter where -/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support -/// interval calculations. -/// -/// [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr -/// [`Operator`]: datafusion_expr::Operator -fn is_prunable(join: &SymmetricHashJoinExec) -> bool { - join.filter().map_or(false, |filter| { - check_support(filter.expression(), &join.schema()) - && filter - .schema() - .fields() - .iter() - .all(|f| is_datatype_supported(f.data_type())) - }) -} - -#[cfg(test)] -mod sql_tests { - use super::*; - use crate::physical_optimizer::test_utils::{ - BinaryTestCase, QueryCase, SourceType, UnaryTestCase, - }; - - #[tokio::test] - async fn test_hash_left_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: false, - }; - - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: true, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "operator: HashJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_right_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: true, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: false, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "operator: HashJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_inner_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: false, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: false, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "Join Error".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_full_outer_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: true, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: true, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "operator: HashJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_aggregate() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: true, - }; - let case = QueryCase { - sql: "SELECT c1, MIN(c4) FROM test GROUP BY c1".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "operator: AggregateExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_window_agg_hash_partition() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: true, - }; - let case = QueryCase { - sql: "SELECT - c9, - SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 - FROM test - LIMIT 5".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "operator: SortExec".to_string() - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_window_agg_single_partition() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: true, - }; - let case = QueryCase { - sql: "SELECT - c9, - SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 - FROM test".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "operator: SortExec".to_string() - }; - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_cross_join() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: true, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Unbounded), - expect_fail: true, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: true, - }; - let test4 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 CROSS JOIN right as t2".to_string(), - cases: vec![ - Arc::new(test1), - Arc::new(test2), - Arc::new(test3), - Arc::new(test4), - ], - error_operator: "operator: CrossJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_analyzer() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: false, - }; - let case = QueryCase { - sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "Analyze Error".to_string(), - }; - - case.run().await?; - Ok(()) - } -} diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/core/src/physical_optimizer/sanity_checker.rs new file mode 100644 index 0000000000000..083b42f7400bc --- /dev/null +++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs @@ -0,0 +1,666 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The [SanityCheckPlan] rule ensures that a given plan can +//! accommodate its infinite sources, if there are any. It will reject +//! non-runnable query plans that use pipeline-breaking operators on +//! infinite input(s). In addition, it will check if all order and +//! distribution requirements of a plan are satisfied by its children. + +use std::sync::Arc; + +use crate::error::Result; +use crate::physical_optimizer::PhysicalOptimizerRule; +use crate::physical_plan::ExecutionPlan; + +use datafusion_common::config::{ConfigOptions, OptimizerOptions}; +use datafusion_common::plan_err; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; +use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; + +use itertools::izip; + +/// The SanityCheckPlan rule rejects the following query plans: +/// 1. Invalid plans containing nodes whose order and/or distribution requirements +/// are not satisfied by their children. +/// 2. Plans that use pipeline-breaking operators on infinite input(s), +/// it is impossible to execute such queries (they will never generate output nor finish) +#[derive(Default)] +pub struct SanityCheckPlan {} + +impl SanityCheckPlan { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for SanityCheckPlan { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + plan.transform_up(|p| check_plan_sanity(p, &config.optimizer)) + .data() + } + + fn name(&self) -> &str { + "SanityCheckPlan" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// This function propagates finiteness information and rejects any plan with +/// pipeline-breaking operators acting on infinite inputs. +pub fn check_finiteness_requirements( + input: Arc, + optimizer_options: &OptimizerOptions, +) -> Result>> { + if let Some(exec) = input.as_any().downcast_ref::() { + if !(optimizer_options.allow_symmetric_joins_without_pruning + || (exec.check_if_order_information_available()? && is_prunable(exec))) + { + return plan_err!("Join operation cannot operate on a non-prunable stream without enabling \ + the 'allow_symmetric_joins_without_pruning' configuration flag"); + } + } + if !input.execution_mode().pipeline_friendly() { + plan_err!( + "Cannot execute pipeline breaking queries, operator: {:?}", + input + ) + } else { + Ok(Transformed::no(input)) + } +} + +/// This function returns whether a given symmetric hash join is amenable to +/// data pruning. For this to be possible, it needs to have a filter where +/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support +/// interval calculations. +/// +/// [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr +/// [`Operator`]: datafusion_expr::Operator +fn is_prunable(join: &SymmetricHashJoinExec) -> bool { + join.filter().map_or(false, |filter| { + check_support(filter.expression(), &join.schema()) + && filter + .schema() + .fields() + .iter() + .all(|f| is_datatype_supported(f.data_type())) + }) +} + +/// Ensures that the plan is pipeline friendly and the order and +/// distribution requirements from its children are satisfied. +pub fn check_plan_sanity( + plan: Arc, + optimizer_options: &OptimizerOptions, +) -> Result>> { + check_finiteness_requirements(plan.clone(), optimizer_options)?; + + for (child, child_sort_req, child_dist_req) in izip!( + plan.children().iter(), + plan.required_input_ordering().iter(), + plan.required_input_distribution().iter() + ) { + let child_eq_props = child.equivalence_properties(); + if let Some(child_sort_req) = child_sort_req { + if !child_eq_props.ordering_satisfy_requirement(child_sort_req) { + let child_plan_str = get_plan_string(child); + return plan_err!( + "Child: {:?} does not satisfy parent order requirements: {:?}", + child_plan_str, + child_sort_req + ); + } + } + + if !child + .output_partitioning() + .satisfy(child_dist_req, child_eq_props) + { + let child_plan_str = get_plan_string(child); + return plan_err!( + "Child: {:?} does not satisfy parent distribution requirements: {:?}", + child_plan_str, + child_dist_req + ); + } + } + + Ok(Transformed::no(plan)) +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::physical_optimizer::test_utils::{ + bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, + repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, + BinaryTestCase, QueryCase, SourceType, UnaryTestCase, + }; + + use arrow::compute::SortOptions; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion_common::Result; + use datafusion_expr::JoinType; + use datafusion_physical_expr::expressions::col; + use datafusion_physical_expr::Partitioning; + use datafusion_physical_plan::displayable; + use datafusion_physical_plan::repartition::RepartitionExec; + + fn create_test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("c9", DataType::Int32, true)])) + } + + fn create_test_schema2() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])) + } + + /// Check if sanity checker should accept or reject plans. + fn assert_sanity_check(plan: &Arc, is_sane: bool) { + let sanity_checker = SanityCheckPlan::new(); + let opts = ConfigOptions::default(); + assert_eq!( + sanity_checker.optimize(plan.clone(), &opts).is_ok(), + is_sane + ); + } + + /// Check if the plan we created is as expected by comparing the plan + /// formatted as a string. + fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) { + let plan_str = displayable(plan).indent(true).to_string(); + let actual_lines: Vec<&str> = plan_str.trim().lines().collect(); + assert_eq!(actual_lines, expected_lines); + } + + #[tokio::test] + async fn test_hash_left_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: false, + }; + + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: true, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1" + .to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "operator: HashJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) + } + + #[tokio::test] + async fn test_hash_right_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: true, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: false, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1" + .to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "operator: HashJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) + } + + #[tokio::test] + async fn test_hash_inner_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: false, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: false, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1" + .to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "Join Error".to_string(), + }; + + case.run().await?; + Ok(()) + } + + #[tokio::test] + async fn test_hash_full_outer_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: true, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: true, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1" + .to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "operator: HashJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) + } + + #[tokio::test] + async fn test_aggregate() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: true, + }; + let case = QueryCase { + sql: "SELECT c1, MIN(c4) FROM test GROUP BY c1".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "operator: AggregateExec".to_string(), + }; + + case.run().await?; + Ok(()) + } + + #[tokio::test] + async fn test_window_agg_hash_partition() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: true, + }; + let case = QueryCase { + sql: "SELECT + c9, + SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 + FROM test + LIMIT 5".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "operator: SortExec".to_string() + }; + + case.run().await?; + Ok(()) + } + + #[tokio::test] + async fn test_window_agg_single_partition() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: true, + }; + let case = QueryCase { + sql: "SELECT + c9, + SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 + FROM test".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "operator: SortExec".to_string() + }; + case.run().await?; + Ok(()) + } + + #[tokio::test] + async fn test_hash_cross_join() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: true, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Unbounded), + expect_fail: true, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: true, + }; + let test4 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 CROSS JOIN right as t2".to_string(), + cases: vec![ + Arc::new(test1), + Arc::new(test2), + Arc::new(test3), + Arc::new(test4), + ], + error_operator: "operator: CrossJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) + } + + #[tokio::test] + async fn test_analyzer() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: false, + }; + let case = QueryCase { + sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "Analyze Error".to_string(), + }; + + case.run().await?; + Ok(()) + } + + #[tokio::test] + /// Tests that plan is valid when the sort requirements are satisfied. + async fn test_bounded_window_agg_sort_requirement() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr_options( + "c9", + &source.schema(), + SortOptions { + descending: false, + nulls_first: false, + }, + )]; + let sort = sort_exec(sort_exprs.clone(), source); + let bw = bounded_window_exec("c9", sort_exprs, sort); + assert_plan(bw.as_ref(), vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]" + ]); + assert_sanity_check(&bw, true); + Ok(()) + } + + #[tokio::test] + /// Tests that plan is invalid when the sort requirements are not satisfied. + async fn test_bounded_window_agg_no_sort_requirement() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr_options( + "c9", + &source.schema(), + SortOptions { + descending: false, + nulls_first: false, + }, + )]; + let bw = bounded_window_exec("c9", sort_exprs, source); + assert_plan(bw.as_ref(), vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " MemoryExec: partitions=1, partition_sizes=[0]" + ]); + // Order requirement of the `BoundedWindowAggExec` is not satisfied. We expect to receive error during sanity check. + assert_sanity_check(&bw, false); + Ok(()) + } + + #[tokio::test] + /// A valid when a single partition requirement + /// is satisfied. + async fn test_global_limit_single_partition() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = global_limit_exec(source); + + assert_plan( + limit.as_ref(), + vec![ + "GlobalLimitExec: skip=0, fetch=100", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&limit, true); + Ok(()) + } + + #[tokio::test] + /// An invalid plan when a single partition requirement + /// is not satisfied. + async fn test_global_limit_multi_partition() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = global_limit_exec(repartition_exec(source)); + + assert_plan( + limit.as_ref(), + vec![ + "GlobalLimitExec: skip=0, fetch=100", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Distribution requirement of the `GlobalLimitExec` is not satisfied. We expect to receive error during sanity check. + assert_sanity_check(&limit, false); + Ok(()) + } + + #[tokio::test] + /// A plan with no requirements should satisfy. + async fn test_local_limit() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = local_limit_exec(source); + + assert_plan( + limit.as_ref(), + vec![ + "LocalLimitExec: fetch=100", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&limit, true); + Ok(()) + } + + #[tokio::test] + /// Valid plan with multiple children satisfy both order and distribution. + async fn test_sort_merge_join_satisfied() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let source2 = memory_exec(&schema2); + let sort_opts = SortOptions::default(); + let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; + let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; + let left = sort_exec(sort_exprs1, source1); + let right = sort_exec(sort_exprs2, source2); + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(vec![right_jcol.clone()], 10), + )?); + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&smj, true); + Ok(()) + } + + #[tokio::test] + /// Invalid case when the order is not satisfied by the 2nd + /// child. + async fn test_sort_merge_join_order_missing() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let right = memory_exec(&schema2); + let sort_exprs1 = vec![sort_expr_options( + "c9", + &source1.schema(), + SortOptions::default(), + )]; + let left = sort_exec(sort_exprs1, source1); + // Missing sort of the right child here.. + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(vec![right_jcol.clone()], 10), + )?); + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Order requirement for the `SortMergeJoin` is not satisfied for right child. We expect to receive error during sanity check. + assert_sanity_check(&smj, false); + Ok(()) + } + + #[tokio::test] + /// Invalid case when the distribution is not satisfied by the 2nd + /// child. + async fn test_sort_merge_join_dist_missing() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let source2 = memory_exec(&schema2); + let sort_opts = SortOptions::default(); + let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; + let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; + let left = sort_exec(sort_exprs1, source1); + let right = sort_exec(sort_exprs2, source2); + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::RoundRobinBatch(10), + )?); + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + // Missing hash partitioning on right child. + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Distribution requirement for the `SortMergeJoin` is not satisfied for right child (has round-robin partitioning). We expect to receive error during sanity check. + assert_sanity_check(&smj, false); + Ok(()) + } +} diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 83531da3ca8ff..36ac4b22d5942 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -245,11 +245,38 @@ fn try_pushdown_requirements_to_join( sort_expr: Vec, push_side: JoinSide, ) -> Result>>>> { + let left_eq_properties = smj.left().equivalence_properties(); + let right_eq_properties = smj.right().equivalence_properties(); + let mut smj_required_orderings = smj.required_input_ordering(); + let right_requirement = smj_required_orderings.swap_remove(1); + let left_requirement = smj_required_orderings.swap_remove(0); let left_ordering = smj.left().output_ordering().unwrap_or(&[]); let right_ordering = smj.right().output_ordering().unwrap_or(&[]); let (new_left_ordering, new_right_ordering) = match push_side { - JoinSide::Left => (sort_expr.as_slice(), right_ordering), - JoinSide::Right => (left_ordering, sort_expr.as_slice()), + JoinSide::Left => { + let left_eq_properties = + left_eq_properties.clone().with_reorder(sort_expr.clone()); + if left_eq_properties + .ordering_satisfy_requirement(&left_requirement.unwrap_or_default()) + { + // After re-ordering requirement is still satisfied + (sort_expr.as_slice(), right_ordering) + } else { + return Ok(None); + } + } + JoinSide::Right => { + let right_eq_properties = + right_eq_properties.clone().with_reorder(sort_expr.clone()); + if right_eq_properties + .ordering_satisfy_requirement(&right_requirement.unwrap_or_default()) + { + // After re-ordering requirement is still satisfied + (left_ordering, sort_expr.as_slice()) + } else { + return Ok(None); + } + } }; let join_type = smj.join_type(); let probe_side = SortMergeJoinExec::probe_side(&join_type); diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index 21ef8a7c2110f..f00d17a06ffc9 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -41,7 +41,7 @@ mod sp_repartition_fuzz_tests { use datafusion_physical_expr::{ equivalence::{EquivalenceClass, EquivalenceProperties}, expressions::{col, Column}, - PhysicalExpr, PhysicalSortExpr, + ConstExpr, PhysicalExpr, PhysicalSortExpr, }; use test_utils::add_empty_batches; @@ -80,7 +80,7 @@ mod sp_repartition_fuzz_tests { // Define a and f are aliases eq_properties.add_equal_conditions(col_a, col_f)?; // Column e has constant value. - eq_properties = eq_properties.add_constants([col_e.clone()]); + eq_properties = eq_properties.add_constants([ConstExpr::new(col_e.clone())]); // Randomly order columns for sorting let mut rng = StdRng::seed_from_u64(seed); @@ -149,7 +149,7 @@ mod sp_repartition_fuzz_tests { // Fill constant columns for constant in eq_properties.constants() { - let col = constant.as_any().downcast_ref::().unwrap(); + let col = constant.expr().as_any().downcast_ref::().unwrap(); let (idx, _field) = schema.column_with_name(col.name()).unwrap(); let arr = Arc::new(UInt64Array::from_iter_values(vec![0; n_elem])) as ArrayRef; diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index fad9b94b01120..addabc8a36127 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -230,7 +230,7 @@ async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> { match df.create_physical_plan().await { Ok(_) => panic!("Expecting error."), Err(e) => { - assert_eq!(e.strip_backtrace(), "PipelineChecker\ncaused by\nError during planning: Join operation cannot operate on a non-prunable stream without enabling the 'allow_symmetric_joins_without_pruning' configuration flag") + assert_eq!(e.strip_backtrace(), "SanityCheckPlan\ncaused by\nError during planning: Join operation cannot operate on a non-prunable stream without enabling the 'allow_symmetric_joins_without_pruning' configuration flag") } } Ok(()) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index b4d12e963611e..6c12acb934bee 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -28,6 +28,73 @@ use crate::{ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; +#[derive(Debug, Clone)] +/// A structure representing a expression known to be constant in a physical execution plan. +/// +/// The `ConstExpr` struct encapsulates an expression that is constant during the execution +/// of a query. For example if a predicate like `A = 5` applied earlier in the plan `A` would +/// be known constant +/// +/// # Fields +/// +/// - `expr`: Constant expression for a node in the physical plan. +/// +/// - `across_partitions`: A boolean flag indicating whether the constant expression is +/// valid across partitions. If set to `true`, the constant expression has same value for all partitions. +/// If set to `false`, the constant expression may have different values for different partitions. +pub struct ConstExpr { + expr: Arc, + across_partitions: bool, +} + +impl ConstExpr { + pub fn new(expr: Arc) -> Self { + Self { + expr, + // By default, assume constant expressions are not same accross partitions. + across_partitions: false, + } + } + + pub fn with_across_partitions(mut self, across_partitions: bool) -> Self { + self.across_partitions = across_partitions; + self + } + + pub fn across_partitions(&self) -> bool { + self.across_partitions + } + + pub fn expr(&self) -> &Arc { + &self.expr + } + + pub fn owned_expr(self) -> Arc { + self.expr + } + + pub fn map(&self, f: F) -> Option + where + F: Fn(&Arc) -> Option>, + { + let maybe_expr = f(&self.expr); + maybe_expr.map(|expr| Self { + expr, + across_partitions: self.across_partitions, + }) + } +} + +/// Checks whether `expr` is among in the `const_exprs`. +pub fn const_exprs_contains( + const_exprs: &[ConstExpr], + expr: &Arc, +) -> bool { + const_exprs + .iter() + .any(|const_expr| const_expr.expr.eq(expr)) +} + /// An `EquivalenceClass` is a set of [`Arc`]s that are known /// to have the same value for all tuples in a relation. These are generated by /// equality predicates (e.g. `a = b`), typically equi-join conditions and diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index 7faf2caae01c9..5eb8a19e3d672 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -27,7 +27,7 @@ mod ordering; mod projection; mod properties; -pub use class::{EquivalenceClass, EquivalenceGroup}; +pub use class::{ConstExpr, EquivalenceClass, EquivalenceGroup}; pub use ordering::OrderingEquivalenceClass; pub use projection::ProjectionMapping; pub use properties::{join_equivalence_properties, EquivalenceProperties}; @@ -205,7 +205,7 @@ mod tests { // Define a and f are aliases eq_properties.add_equal_conditions(col_a, col_f)?; // Column e has constant value. - eq_properties = eq_properties.add_constants([col_e.clone()]); + eq_properties = eq_properties.add_constants([ConstExpr::new(col_e.clone())]); // Randomly order columns for sorting let mut rng = StdRng::seed_from_u64(seed); @@ -482,7 +482,7 @@ mod tests { // Fill constant columns for constant in &eq_properties.constants { - let col = constant.as_any().downcast_ref::().unwrap(); + let col = constant.expr().as_any().downcast_ref::().unwrap(); let (idx, _field) = schema.column_with_name(col.name()).unwrap(); let arr = Arc::new(Float64Array::from_iter_values(vec![0 as f64; n_elem])) as ArrayRef; diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 7857d9df726e9..ac9d64e486ac6 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -234,7 +234,7 @@ mod tests { }; use crate::expressions::{col, BinaryExpr, Column}; use crate::utils::tests::TestScalarUDF; - use crate::{PhysicalExpr, PhysicalSortExpr}; + use crate::{ConstExpr, PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::SortOptions; @@ -554,7 +554,9 @@ mod tests { let eq_group = EquivalenceGroup::new(eq_group); eq_properties.add_equivalence_group(eq_group); - let constants = constants.into_iter().cloned(); + let constants = constants + .into_iter() + .map(|expr| ConstExpr::new(expr.clone()).with_across_partitions(true)); eq_properties = eq_properties.add_constants(constants); let reqs = convert_to_sort_exprs(&reqs); diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 7bf389ecfdf32..e3a2d1c753ca4 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -19,12 +19,13 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use super::ordering::collapse_lex_ordering; +use crate::equivalence::class::const_exprs_contains; use crate::equivalence::{ collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; use crate::expressions::Literal; use crate::{ - physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, + physical_exprs_contains, ConstExpr, LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, }; @@ -92,7 +93,7 @@ pub struct EquivalenceProperties { /// Expressions whose values are constant throughout the table. /// TODO: We do not need to track constants separately, they can be tracked /// inside `eq_groups` as `Literal` expressions. - pub constants: Vec>, + pub constants: Vec, /// Schema associated with this object. schema: SchemaRef, } @@ -134,7 +135,7 @@ impl EquivalenceProperties { } /// Returns a reference to the constant expressions - pub fn constants(&self) -> &[Arc] { + pub fn constants(&self) -> &[ConstExpr] { &self.constants } @@ -144,7 +145,7 @@ impl EquivalenceProperties { let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default(); // Prune out constant expressions output_ordering - .retain(|sort_expr| !physical_exprs_contains(constants, &sort_expr.expr)); + .retain(|sort_expr| !const_exprs_contains(constants, &sort_expr.expr)); (!output_ordering.is_empty()).then_some(output_ordering) } @@ -173,6 +174,12 @@ impl EquivalenceProperties { self.oeq_class.clear(); } + /// Removes constant expressions that may change across partitions. + /// This method should be used when data from different partitions are merged. + pub fn clear_per_partition_constants(&mut self) { + self.constants.retain(|item| item.across_partitions()); + } + /// Extends this `EquivalenceProperties` by adding the orderings inside the /// ordering equivalence class `other`. pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) { @@ -204,13 +211,15 @@ impl EquivalenceProperties { // Discover new constants in light of new the equality: if self.is_expr_constant(left) { // Left expression is constant, add right as constant - if !physical_exprs_contains(&self.constants, right) { - self.constants.push(right.clone()); + if !const_exprs_contains(&self.constants, right) { + self.constants + .push(ConstExpr::new(right.clone()).with_across_partitions(true)); } } else if self.is_expr_constant(right) { // Right expression is constant, add left as constant - if !physical_exprs_contains(&self.constants, left) { - self.constants.push(left.clone()); + if !const_exprs_contains(&self.constants, left) { + self.constants + .push(ConstExpr::new(left.clone()).with_across_partitions(true)); } } @@ -270,11 +279,29 @@ impl EquivalenceProperties { /// Track/register physical expressions with constant values. pub fn add_constants( mut self, - constants: impl IntoIterator>, + constants: impl IntoIterator, ) -> Self { - for expr in self.eq_group.normalize_exprs(constants) { - if !physical_exprs_contains(&self.constants, &expr) { - self.constants.push(expr); + let (const_exprs, across_partition_flags): ( + Vec>, + Vec, + ) = constants + .into_iter() + .map(|const_expr| { + let across_partitions = const_expr.across_partitions(); + let expr = const_expr.owned_expr(); + (expr, across_partitions) + }) + .unzip(); + for (expr, across_partitions) in self + .eq_group + .normalize_exprs(const_exprs) + .into_iter() + .zip(across_partition_flags) + { + if !const_exprs_contains(&self.constants, &expr) { + let const_expr = + ConstExpr::new(expr).with_across_partitions(across_partitions); + self.constants.push(const_expr); } } self @@ -326,7 +353,13 @@ impl EquivalenceProperties { sort_reqs: LexRequirementRef, ) -> LexRequirement { let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs); - let constants_normalized = self.eq_group.normalize_exprs(self.constants.clone()); + let mut constant_exprs = vec![]; + constant_exprs.extend( + self.constants + .iter() + .map(|const_expr| const_expr.expr().clone()), + ); + let constants_normalized = self.eq_group.normalize_exprs(constant_exprs); // Prune redundant sections in the requirement: collapse_lex_req( normalized_sort_reqs @@ -370,8 +403,8 @@ impl EquivalenceProperties { // From the analysis above, we know that `[a ASC]` is satisfied. Then, // we add column `a` as constant to the algorithm state. This enables us // to deduce that `(b + c) ASC` is satisfied, given `a` is constant. - eq_properties = - eq_properties.add_constants(std::iter::once(normalized_req.expr)); + eq_properties = eq_properties + .add_constants(std::iter::once(ConstExpr::new(normalized_req.expr))); } true } @@ -781,24 +814,25 @@ impl EquivalenceProperties { /// # Returns /// /// Returns a `Vec>` containing the projected constants. - fn projected_constants( - &self, - mapping: &ProjectionMapping, - ) -> Vec> { + fn projected_constants(&self, mapping: &ProjectionMapping) -> Vec { // First, project existing constants. For example, assume that `a + b` // is known to be constant. If the projection were `a as a_new`, `b as b_new`, // then we would project constant `a + b` as `a_new + b_new`. let mut projected_constants = self .constants .iter() - .flat_map(|expr| self.eq_group.project_expr(mapping, expr)) + .flat_map(|const_expr| { + const_expr.map(|expr| self.eq_group.project_expr(mapping, expr)) + }) .collect::>(); // Add projection expressions that are known to be constant: for (source, target) in mapping.iter() { if self.is_expr_constant(source) - && !physical_exprs_contains(&projected_constants, target) + && !const_exprs_contains(&projected_constants, target) { - projected_constants.push(target.clone()); + // Expression evaluates to single value + projected_constants + .push(ConstExpr::new(target.clone()).with_across_partitions(true)); } } projected_constants @@ -891,8 +925,8 @@ impl EquivalenceProperties { // Note that these expressions are not properly "constants". This is just // an implementation strategy confined to this function. for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs { - eq_properties = - eq_properties.add_constants(std::iter::once(expr.clone())); + eq_properties = eq_properties + .add_constants(std::iter::once(ConstExpr::new(expr.clone()))); search_indices.shift_remove(idx); } // Add new ordered section to the state. @@ -917,7 +951,11 @@ impl EquivalenceProperties { // As an example, assume that we know columns `a` and `b` are constant. // Then, `a`, `b` and `a + b` will all return `true` whereas `c` will // return `false`. - let normalized_constants = self.eq_group.normalize_exprs(self.constants.to_vec()); + let const_exprs = self + .constants + .iter() + .map(|const_expr| const_expr.expr().clone()); + let normalized_constants = self.eq_group.normalize_exprs(const_exprs); let normalized_expr = self.eq_group.normalize_expr(expr.clone()); is_constant_recurse(&normalized_constants, &normalized_expr) } @@ -1307,8 +1345,16 @@ pub fn join_equivalence_properties( on, )); - let left_oeq_class = left.oeq_class; - let mut right_oeq_class = right.oeq_class; + let EquivalenceProperties { + constants: left_constants, + oeq_class: left_oeq_class, + .. + } = left; + let EquivalenceProperties { + constants: right_constants, + oeq_class: mut right_oeq_class, + .. + } = right; match maintains_input_order { [true, false] => { // In this special case, right side ordering can be prefixed with @@ -1361,6 +1407,15 @@ pub fn join_equivalence_properties( [true, true] => unreachable!("Cannot maintain ordering of both sides"), _ => unreachable!("Join operators can not have more than two children"), } + match join_type { + JoinType::LeftAnti | JoinType::LeftSemi => { + result = result.add_constants(left_constants); + } + JoinType::RightAnti | JoinType::RightSemi => { + result = result.add_constants(right_constants); + } + _ => {} + } result } @@ -2088,7 +2143,7 @@ mod tests { let col_h = &col("h", &test_schema)?; // Add column h as constant - eq_properties = eq_properties.add_constants(vec![col_h.clone()]); + eq_properties = eq_properties.add_constants(vec![ConstExpr::new(col_h.clone())]); let test_cases = vec![ // TEST CASE 1 @@ -2386,7 +2441,9 @@ mod tests { ]; for case in cases { - let mut properties = base_properties.clone().add_constants(case.constants); + let mut properties = base_properties + .clone() + .add_constants(case.constants.into_iter().map(ConstExpr::new)); for [left, right] in &case.equal_conditions { properties.add_equal_conditions(left, right)? } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index b764e81a95d13..06c73636773eb 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -46,7 +46,7 @@ pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; pub use datafusion_physical_expr_common::aggregate::{ AggregateExpr, AggregatePhysicalExpressions, }; -pub use equivalence::EquivalenceProperties; +pub use equivalence::{ConstExpr, EquivalenceProperties}; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index ce67cba2cd0e0..93f449f2d39b8 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -65,7 +65,7 @@ impl CoalescePartitionsExec { // Coalescing partitions loses existing orderings: let mut eq_properties = input.equivalence_properties().clone(); eq_properties.clear_orderings(); - + eq_properties.clear_per_partition_constants(); PlanProperties::new( eq_properties, // Equivalence Properties Partitioning::UnknownPartitioning(1), // Output Partitioning diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6153dbacfbff0..c141958c11718 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -44,7 +44,7 @@ use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AnalysisContext, ExprBoundaries, PhysicalExpr, + analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, }; use futures::stream::{Stream, StreamExt}; @@ -162,7 +162,7 @@ impl FilterExec { fn extend_constants( input: &Arc, predicate: &Arc, - ) -> Vec> { + ) -> Vec { let mut res_constants = Vec::new(); let input_eqs = input.equivalence_properties(); @@ -170,10 +170,17 @@ impl FilterExec { for conjunction in conjunctions { if let Some(binary) = conjunction.as_any().downcast_ref::() { if binary.op() == &Operator::Eq { + // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { - res_constants.push(binary.right().clone()) + res_constants.push( + ConstExpr::new(binary.right().clone()) + .with_across_partitions(true), + ) } else if input_eqs.is_expr_constant(binary.right()) { - res_constants.push(binary.left().clone()) + res_constants.push( + ConstExpr::new(binary.left().clone()) + .with_across_partitions(true), + ) } } } @@ -199,7 +206,10 @@ impl FilterExec { let constants = collect_columns(predicate) .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) - .map(|column| Arc::new(column) as _); + .map(|column| { + let expr = Arc::new(column) as _; + ConstExpr::new(expr).with_across_partitions(true) + }); // this is for statistics eq_properties = eq_properties.add_constants(constants); // this is for logical constant (for example: a = '1', then a could be marked as a constant) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 65f7d5070a5d5..d9e16c98eee89 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -701,6 +701,11 @@ impl RepartitionExec { if !Self::maintains_input_order_helper(input, preserve_order)[0] { eq_properties.clear_orderings(); } + // When there are more than one input partitions, they will be fused at the output. + // Therefore, remove per partition constants. + if input.output_partitioning().partition_count() > 1 { + eq_properties.clear_per_partition_constants(); + } eq_properties } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 8a349bd22abf8..e364aca3791c6 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -80,7 +80,7 @@ pub struct SortPreservingMergeExec { impl SortPreservingMergeExec { /// Create a new sort execution plan pub fn new(expr: Vec, input: Arc) -> Self { - let cache = Self::compute_properties(&input); + let cache = Self::compute_properties(&input, expr.clone()); Self { input, expr, @@ -111,11 +111,17 @@ impl SortPreservingMergeExec { } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties(input: &Arc) -> PlanProperties { + fn compute_properties( + input: &Arc, + ordering: Vec, + ) -> PlanProperties { + let mut eq_properties = input.equivalence_properties().clone(); + eq_properties.clear_per_partition_constants(); + eq_properties.add_new_orderings(vec![ordering]); PlanProperties::new( - input.equivalence_properties().clone(), // Equivalence Properties - Partitioning::UnknownPartitioning(1), // Output Partitioning - input.execution_mode(), // Execution Mode + eq_properties, // Equivalence Properties + Partitioning::UnknownPartitioning(1), // Output Partitioning + input.execution_mode(), // Execution Mode ) } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index dc7d270bae257..3f88eb4c3732b 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -41,7 +41,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; use datafusion_common::{exec_err, internal_err, Result}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::{ConstExpr, EquivalenceProperties}; use futures::Stream; use itertools::Itertools; @@ -118,41 +118,11 @@ impl UnionExec { schema: SchemaRef, ) -> PlanProperties { // Calculate equivalence properties: - // TODO: In some cases, we should be able to preserve some equivalence - // classes and constants. Add support for such cases. let children_eqs = inputs .iter() .map(|child| child.equivalence_properties()) .collect::>(); - let mut eq_properties = EquivalenceProperties::new(schema); - // Use the ordering equivalence class of the first child as the seed: - let mut meets = children_eqs[0] - .oeq_class() - .iter() - .map(|item| item.to_vec()) - .collect::>(); - // Iterate over all the children: - for child_eqs in &children_eqs[1..] { - // Compute meet orderings of the current meets and the new ordering - // equivalence class. - let mut idx = 0; - while idx < meets.len() { - // Find all the meets of `current_meet` with this child's orderings: - let valid_meets = child_eqs.oeq_class().iter().filter_map(|ordering| { - child_eqs.get_meet_ordering(ordering, &meets[idx]) - }); - // Use the longest of these meets as others are redundant: - if let Some(next_meet) = valid_meets.max_by_key(|m| m.len()) { - meets[idx] = next_meet; - idx += 1; - } else { - meets.swap_remove(idx); - } - } - } - // We know have all the valid orderings after union, remove redundant - // entries (implicitly) and return: - eq_properties.add_new_orderings(meets); + let eq_properties = calculate_union_eq_properties(&children_eqs, schema); // Calculate output partitioning; i.e. sum output partitions of the inputs. let num_partitions = inputs @@ -167,6 +137,68 @@ impl UnionExec { PlanProperties::new(eq_properties, output_partitioning, mode) } } +/// Calculate `EquivalenceProperties` for `UnionExec` from the `EquivalenceProperties` +/// of its children. +fn calculate_union_eq_properties( + children_eqs: &[&EquivalenceProperties], + schema: SchemaRef, +) -> EquivalenceProperties { + // Calculate equivalence properties: + // TODO: In some cases, we should be able to preserve some equivalence + // classes and constants. Add support for such cases. + let mut eq_properties = EquivalenceProperties::new(schema); + // Use the ordering equivalence class of the first child as the seed: + let mut meets = children_eqs[0] + .oeq_class() + .iter() + .map(|item| item.to_vec()) + .collect::>(); + // Iterate over all the children: + for child_eqs in &children_eqs[1..] { + // Compute meet orderings of the current meets and the new ordering + // equivalence class. + let mut idx = 0; + while idx < meets.len() { + // Find all the meets of `current_meet` with this child's orderings: + let valid_meets = child_eqs.oeq_class().iter().filter_map(|ordering| { + child_eqs.get_meet_ordering(ordering, &meets[idx]) + }); + // Use the longest of these meets as others are redundant: + if let Some(next_meet) = valid_meets.max_by_key(|m| m.len()) { + meets[idx] = next_meet; + idx += 1; + } else { + meets.swap_remove(idx); + } + } + } + // We know have all the valid orderings after union, remove redundant + // entries (implicitly) and return: + eq_properties.add_new_orderings(meets); + + let mut meet_constants = children_eqs[0].constants().to_vec(); + // Iterate over all the children: + for child_eqs in &children_eqs[1..] { + let constants = child_eqs.constants(); + meet_constants = meet_constants + .into_iter() + .filter_map(|meet_constant| { + for const_expr in constants { + if const_expr.expr().eq(meet_constant.expr()) { + // TODO: Check whether constant expressions evaluates the same value or not for each partition + let across_partitions = false; + return Some( + ConstExpr::new(meet_constant.owned_expr()) + .with_across_partitions(across_partitions), + ); + } + } + None + }) + .collect::>(); + } + eq_properties.add_constants(meet_constants) +} impl DisplayAs for UnionExec { fn fmt_as( diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 181c308004346..252c8d12b5194 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -41,7 +41,8 @@ use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr}, - AggregateExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement, + AggregateExpr, ConstExpr, EquivalenceProperties, LexOrdering, + PhysicalSortRequirement, }; use itertools::Itertools; @@ -576,7 +577,10 @@ pub fn get_window_mode( options: None, })); // Treat partition by exprs as constant. During analysis of requirements are satisfied. - let partition_by_eqs = input_eqs.add_constants(partitionby_exprs.iter().cloned()); + let const_exprs = partitionby_exprs + .iter() + .map(|expr| ConstExpr::new(expr.clone())); + let partition_by_eqs = input_eqs.add_constants(const_exprs); let order_by_reqs = PhysicalSortRequirement::from_sort_exprs(orderby_keys); let reverse_order_by_reqs = PhysicalSortRequirement::from_sort_exprs(&reverse_order_bys(orderby_keys)); diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 552ad6a2a7563..e891093c81560 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -251,7 +251,7 @@ physical_plan 02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[ARRAY_AGG(DISTINCT a.foo), sum(DISTINCT Int64(1))] 03)----CoalesceBatchesExec: target_batch_size=8192 04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5 -05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[ARRAY_AGG(DISTINCT a.foo), sum(DISTINCT Int64(1))] +05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[ARRAY_AGG(DISTINCT a.foo), sum(DISTINCT Int64(1))], ordering_mode=Sorted 06)----------UnionExec 07)------------ProjectionExec: expr=[1 as id, 2 as foo] 08)--------------PlaceholderRowExec diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index b850760b8734a..3a4e8072bbc76 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -257,7 +257,7 @@ physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after PipelineChecker SAME TEXT AS ABOVE +physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] physical_plan_with_schema CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, schema=[a:Int32;N, b:Int32;N, c:Int32;N] @@ -336,7 +336,7 @@ physical_plan after OutputRequirements 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after PipelineChecker SAME TEXT AS ABOVE +physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] @@ -379,7 +379,7 @@ physical_plan after OutputRequirements 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after PipelineChecker SAME TEXT AS ABOVE +physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan 01)GlobalLimitExec: skip=0, fetch=10 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 501ae497745b0..3cbeea0f92221 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3165,6 +3165,9 @@ WITH ORDER (a ASC NULLS FIRST, b ASC, c ASC) LOCATION '../core/tests/data/window_2.csv' OPTIONS ('format.has_header' 'true'); +statement ok +set datafusion.optimizer.prefer_existing_sort = true; + # sort merge join should propagate ordering equivalence of the left side # for inner join. Hence final requirement rn1 ASC is already satisfied at # the end of SortMergeJoinExec. @@ -3188,18 +3191,16 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [rn1@5 ASC NULLS LAST] 02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] -03)----SortExec: expr=[rn1@5 ASC NULLS LAST], preserve_partitioning=[true] -04)------CoalesceBatchesExec: target_batch_size=2 -05)--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 -06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -07)------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] -08)--------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted] -09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true -10)----SortExec: expr=[a@1 ASC], preserve_partitioning=[true] -11)------CoalesceBatchesExec: target_batch_size=2 -12)--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 -13)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -14)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true +03)----CoalesceBatchesExec: target_batch_size=2 +04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,rn1@5 ASC NULLS LAST +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +06)----------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] +07)------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted] +08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true +09)----CoalesceBatchesExec: target_batch_size=2 +10)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST +11)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +12)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true # sort merge join should propagate ordering equivalence of the right side # for right join. Hence final requirement rn1 ASC is already satisfied at @@ -3224,18 +3225,19 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [rn1@10 ASC NULLS LAST] 02)--SortMergeJoin: join_type=Right, on=[(a@1, a@1)] -03)----SortExec: expr=[a@1 ASC], preserve_partitioning=[true] -04)------CoalesceBatchesExec: target_batch_size=2 -05)--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 -06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true -08)----SortExec: expr=[rn1@5 ASC NULLS LAST], preserve_partitioning=[true] -09)------CoalesceBatchesExec: target_batch_size=2 -10)--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 -11)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -12)------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] -13)--------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted] -14)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true +03)----CoalesceBatchesExec: target_batch_size=2 +04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true +07)----CoalesceBatchesExec: target_batch_size=2 +08)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,rn1@5 ASC NULLS LAST +09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +10)----------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] +11)------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted] +12)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true + +statement ok +set datafusion.optimizer.prefer_existing_sort = false; # SortMergeJoin should add ordering equivalences of # right table as lexicographical append to the global ordering diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index e6f3e70c1ebda..ba07b4ed0a87c 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -276,7 +276,7 @@ physical_plan 04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[MAX(d.a)] 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4 -07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[MAX(d.a)] +07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[MAX(d.a)], ordering_mode=Sorted 08)--------------UnionExec 09)----------------ProjectionExec: expr=[1 as a, aa as b] 10)------------------PlaceholderRowExec @@ -3190,8 +3190,8 @@ SELECT a, d, rn1, rank1 FROM (SELECT a, d, # this is a negative test for asserting that ROW_NUMBER is not # added to the ordering equivalence when it contains partition by. # physical plan should contain SortExec. Since source is unbounded -# pipeline checker should raise error, when plan contains SortExec. -statement error DataFusion error: PipelineChecker +# sanity checker should raise error, when plan contains SortExec. +statement error DataFusion error: SanityCheckPlan SELECT a, d, rn1 FROM (SELECT a, d, ROW_NUMBER() OVER(PARTITION BY d ORDER BY a ASC) as rn1 FROM annotated_data_infinite2