Skip to content

Commit

Permalink
Add Optimizer Sanity Checker, improve sortedness equivalence properti…
Browse files Browse the repository at this point in the history
…es (apache#11196)

* Initial optimizer sanity checker.

Only includes sort reqs, docs will be added.

* Add distro and pipeline friendly checks

* Also check the plans we create are correct.

* Add distribution test cases using global limit exec.

* Add test for multiple children using SortMergeJoinExec.

* Move PipelineChecker to SanityCheckPlan

* Fix some tests and add docs

* Add some test docs and fix clippy diagnostics.

* Fix some failing tests

* Replace PipelineChecker with SanityChecker in .slt files.

* Initial commit

* Slt tests pass

* Resolve linter errors

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Sort PreservingMerge clear per partition

* Minor changes

* Update output_requirements.rs

* Address reviews

* Update datafusion/core/src/physical_optimizer/optimizer.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Update datafusion/core/src/physical_optimizer/sanity_checker.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Address reviews

* Minor changes

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* Update comment

* Add map implementation

---------

Co-authored-by: Erman Yafay <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
5 people authored and comphead committed Jul 8, 2024
1 parent cec318a commit cc01c23
Show file tree
Hide file tree
Showing 23 changed files with 1,009 additions and 459 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ pub mod join_selection;
pub mod limited_distinct_aggregation;
pub mod optimizer;
pub mod output_requirements;
pub mod pipeline_checker;
pub mod projection_pushdown;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
pub mod sanity_checker;
mod sort_pushdown;
pub mod topk_aggregation;
pub mod update_aggr_exprs;
Expand Down
16 changes: 10 additions & 6 deletions datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::physical_optimizer::enforce_sorting::EnforceSorting;
use crate::physical_optimizer::join_selection::JoinSelection;
use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation;
use crate::physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_optimizer::pipeline_checker::PipelineChecker;
use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
use crate::physical_optimizer::topk_aggregation::TopKAggregation;
use crate::{error::Result, physical_plan::ExecutionPlan};

Expand Down Expand Up @@ -124,11 +124,15 @@ impl PhysicalOptimizer {
// are not present, the load of executors such as join or union will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
// The PipelineChecker rule will reject non-runnable query plans that use
// pipeline-breaking operators on infinite input(s). The rule generates a
// diagnostic error message when this happens. It makes no changes to the
// given query plan; i.e. it only acts as a final gatekeeping rule.
Arc::new(PipelineChecker::new()),
// The SanityCheckPlan rule checks whether the order and
// distribution requirements of each node in the plan
// is satisfied. It will also reject non-runnable query
// plans that use pipeline-breaking operators on infinite
// input(s). The rule generates a diagnostic error
// message for invalid plans. It makes no changes to the
// given query plan; i.e. it only acts as a final
// gatekeeping rule.
Arc::new(SanityCheckPlan::new()),
];

Self::with_rules(rules)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ fn require_top_ordering_helper(
if children.len() != 1 {
Ok((plan, false))
} else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let req_ordering = sort_exec.properties().output_ordering().unwrap_or(&[]);
// In case of constant columns, output ordering of SortExec would give an empty set.
// Therefore; we check the sort expression field of the SortExec to assign the requirements.
let req_ordering = sort_exec.expr();
let req_dist = sort_exec.required_input_distribution()[0].clone();
let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering);
Ok((
Expand Down
334 changes: 0 additions & 334 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs

This file was deleted.

Loading

0 comments on commit cc01c23

Please sign in to comment.