-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Optimizer Sanity Checker, improve sortedness equivalence properties #11196
Add Optimizer Sanity Checker, improve sortedness equivalence properties #11196
Conversation
Only includes sort reqs, docs will be added.
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Optimizer Sanity Checker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed this PR carefully and it looks good to me. Thank you @yfy- and @mustafasrepo for collaborating on this.
Hopefully this rule will make us discover plan bugs much earlier, prevent regressions and increase developer discipline on operator and rule implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @mustafasrepo (and @yfy- ) - I found this PR very well written, documented and tested.
I had a few small comment suggestions as I read this PR, but overall I thought it looked quite nice
I also added a few notes to the description / title of the PR to make it clear this also improves some of the sortedness matching calculations in addtion to adding a new sanity check
@@ -173,6 +174,12 @@ impl EquivalenceProperties { | |||
self.oeq_class.clear(); | |||
} | |||
|
|||
/// Removes constant expressions that may change across partitions. | |||
/// This method should be used when different partitions are fused. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does a "fused" partition mean? That is not a term I have run into previously
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is data from multiple partitions ends up in another partition (operators that does this CoalescePartitionsExec
, SortPreservingMergeExec
, InterleaveExec
, RepartitionExec
). I changed to term to "merge" in 747b69b. However, If you have other suggestions which communicates the intent in this context better (maybe more common vocabulary in literature). I can update with that term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge sounds good -- thank you
// First, project existing constants. For example, assume that `a + b` | ||
// is known to be constant. If the projection were `a as a_new`, `b as b_new`, | ||
// then we would project constant `a + b` as `a_new + b_new`. | ||
let mut projected_constants = self | ||
.constants | ||
.iter() | ||
.flat_map(|expr| self.eq_group.project_expr(mapping, expr)) | ||
.flat_map(|const_expr| { | ||
self.eq_group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you could add an API like ConstExpr::map
that applied a function to the contained Expr. That way this could look like
.flat_map(|expr| expr.map(|expr| self.eq_group.project_expr(mapping, expr))))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in 747b69b.
12)--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 | ||
13)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 | ||
14)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true | ||
03)----CoalesceBatchesExec: target_batch_size=2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these results due to changing the setting
set datafusion.optimizer.prefer_existing_sort = true;
Or to the changes in the calculations for the sortedness calculations?
The plan seems better to me (there is no sort)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, previous plan was wrong (Another case this new rule helped us discover), since one of the children of the SortMergeJoin
doesn't have ordering: [a ASC]
. After fix, generated plan was quite different. the intent of this test was (as far as I can tell) to see order by at the top of the SortMergeJoin
can be pushed down through SortMergeJoin
. Hence, I changed the setting to preserve this behaviour.
The result change (where sort is removed) comes from the setting change indeed. However, as I said the plan has changed with the setting set datafusion.optimizer.prefer_existing_sort = false;
also. Hence, I decided to change flag to preserve the intent of the test.
Co-authored-by: Andrew Lamb <[email protected]>
Thanks @mustafasrepo ! |
FYI this check is now failing on one of the sql planning benchmarks: #11322 |
…es (apache#11196) * Initial optimizer sanity checker. Only includes sort reqs, docs will be added. * Add distro and pipeline friendly checks * Also check the plans we create are correct. * Add distribution test cases using global limit exec. * Add test for multiple children using SortMergeJoinExec. * Move PipelineChecker to SanityCheckPlan * Fix some tests and add docs * Add some test docs and fix clippy diagnostics. * Fix some failing tests * Replace PipelineChecker with SanityChecker in .slt files. * Initial commit * Slt tests pass * Resolve linter errors * Minor changes * Minor changes * Minor changes * Minor changes * Sort PreservingMerge clear per partition * Minor changes * Update output_requirements.rs * Address reviews * Update datafusion/core/src/physical_optimizer/optimizer.rs Co-authored-by: Mehmet Ozan Kabak <[email protected]> * Update datafusion/core/src/physical_optimizer/sanity_checker.rs Co-authored-by: Mehmet Ozan Kabak <[email protected]> * Address reviews * Minor changes * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * Update comment * Add map implementation --------- Co-authored-by: Erman Yafay <[email protected]> Co-authored-by: berkaysynnada <[email protected]> Co-authored-by: Mehmet Ozan Kabak <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
…es (apache#11196) * Initial optimizer sanity checker. Only includes sort reqs, docs will be added. * Add distro and pipeline friendly checks * Also check the plans we create are correct. * Add distribution test cases using global limit exec. * Add test for multiple children using SortMergeJoinExec. * Move PipelineChecker to SanityCheckPlan * Fix some tests and add docs * Add some test docs and fix clippy diagnostics. * Fix some failing tests * Replace PipelineChecker with SanityChecker in .slt files. * Initial commit * Slt tests pass * Resolve linter errors * Minor changes * Minor changes * Minor changes * Minor changes * Sort PreservingMerge clear per partition * Minor changes * Update output_requirements.rs * Address reviews * Update datafusion/core/src/physical_optimizer/optimizer.rs Co-authored-by: Mehmet Ozan Kabak <[email protected]> * Update datafusion/core/src/physical_optimizer/sanity_checker.rs Co-authored-by: Mehmet Ozan Kabak <[email protected]> * Address reviews * Minor changes * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * Update comment * Add map implementation --------- Co-authored-by: Erman Yafay <[email protected]> Co-authored-by: berkaysynnada <[email protected]> Co-authored-by: Mehmet Ozan Kabak <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
We found an issue (that I think is exposed by this change, not caused by it) while upgrading InfluxDB 3.0: #11492 |
…es (apache#11196) * Initial optimizer sanity checker. Only includes sort reqs, docs will be added. * Add distro and pipeline friendly checks * Also check the plans we create are correct. * Add distribution test cases using global limit exec. * Add test for multiple children using SortMergeJoinExec. * Move PipelineChecker to SanityCheckPlan * Fix some tests and add docs * Add some test docs and fix clippy diagnostics. * Fix some failing tests * Replace PipelineChecker with SanityChecker in .slt files. * Initial commit * Slt tests pass * Resolve linter errors * Minor changes * Minor changes * Minor changes * Minor changes * Sort PreservingMerge clear per partition * Minor changes * Update output_requirements.rs * Address reviews * Update datafusion/core/src/physical_optimizer/optimizer.rs Co-authored-by: Mehmet Ozan Kabak <[email protected]> * Update datafusion/core/src/physical_optimizer/sanity_checker.rs Co-authored-by: Mehmet Ozan Kabak <[email protected]> * Address reviews * Minor changes * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * Update comment * Add map implementation --------- Co-authored-by: Erman Yafay <[email protected]> Co-authored-by: berkaysynnada <[email protected]> Co-authored-by: Mehmet Ozan Kabak <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
* fix(11397): do not surface errors for closed channels, and instead let the task join errors be surfaced * fix(11397): terminate early on channel send failure Add Optimizer Sanity Checker, improve sortedness equivalence properties (apache#11196) * Initial optimizer sanity checker. Only includes sort reqs, docs will be added. * Add distro and pipeline friendly checks * Also check the plans we create are correct. * Add distribution test cases using global limit exec. * Add test for multiple children using SortMergeJoinExec. * Move PipelineChecker to SanityCheckPlan * Fix some tests and add docs * Add some test docs and fix clippy diagnostics. * Fix some failing tests * Replace PipelineChecker with SanityChecker in .slt files. * Initial commit * Slt tests pass * Resolve linter errors * Minor changes * Minor changes * Minor changes * Minor changes * Sort PreservingMerge clear per partition * Minor changes * Update output_requirements.rs * Address reviews * Update datafusion/core/src/physical_optimizer/optimizer.rs Co-authored-by: Mehmet Ozan Kabak <[email protected]> * Update datafusion/core/src/physical_optimizer/sanity_checker.rs Co-authored-by: Mehmet Ozan Kabak <[email protected]> * Address reviews * Minor changes * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * Update comment * Add map implementation --------- Co-authored-by: Erman Yafay <[email protected]> Co-authored-by: berkaysynnada <[email protected]> Co-authored-by: Mehmet Ozan Kabak <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
This check was being triggered in our downstream tests, but I think it actually found a real bug: #11675 |
I believe this check also found another bug / limitation: #12414 |
Rationale for this change
Most of the contribution in this PR comes from @yfy- . Thanks @yfy- for working on this issue.
We extend the existing
PipelineChecker
with additional checks of order and distribution requirements. In a physical plan each plan should be pipeline friendly (previously checked byPipelineChecker
) and each child plan's output order and partitioning should satisfy the parent's respective requirements. We combine these checks into theSanityCheckPlan
proposed with this PR.With the
SanityCheckPlan
, as the optimizer steps change and grow, we ensure that the order and distribution requirements of the final phsyical plan are always satisfied so that it can yield correct results.What changes are included in this PR?
SanityCheckPlan
step as the last step of the physical plan optimizations.SanityCheckPlan
contains the formerPipelineChecker
, that's whyPipelineChecker
is deleted.ConstExpr
API to manage tracking constant expressionsAre these changes tested?
We use the existing test cases of the former
PipelineChecker
. In addition following cases are added:Using
BoundedWindowAggExec
2 cases: First with the child order requirement is satisfied and another one that is not satisifed.
Using
GlobalLimitExec
2 cases: First with the child distribution requirement is satisfied and another one that is not satisifed.
Using
LocalLimitExec
We check when there are no requirements at all our check passes.
Using
SortMergeJoinExec
3 cases: First case with both children satisify both requirements. Second case, where the second child does not satisfy the order requirement. Finally, a case where the second child does not satisfy distribution requirements.