Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SanityCheckPlan should compare UnionExec inputs to requirements for output (parent). #12414

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::hash::{Hash, Hasher};
use std::ops::RangeFull;
use std::sync::Arc;

use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
Expand All @@ -27,6 +29,7 @@ use crate::{

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::JoinType;
use indexmap::IndexSet;

#[derive(Debug, Clone)]
/// A structure representing a expression known to be constant in a physical execution plan.
Expand Down Expand Up @@ -124,6 +127,35 @@ pub fn const_exprs_contains(
.any(|const_expr| const_expr.expr.eq(expr))
}

impl Eq for ConstExpr {}

impl PartialEq for ConstExpr {
fn eq(&self, other: &Self) -> bool {
self.expr.eq(other.expr())
}
}

impl Hash for ConstExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.expr.hash(state);
}
}

/// Concats two slices of `const_exprs, removing duplicates and
/// maintaining the order.
///
/// Equality based upon the expression. `across_partitions` will
/// always be false as we do not validate the same constant value
/// on both sides.
pub fn concat_const_exprs(lhs: &[ConstExpr], rhs: &[ConstExpr]) -> Vec<ConstExpr> {
IndexSet::<&ConstExpr>::from_iter(lhs.iter().chain(rhs.iter()))
.drain(RangeFull)
.map(|constant_expr| {
ConstExpr::new(Arc::clone(&constant_expr.expr)).with_across_partitions(false)
})
.collect()
}

/// An `EquivalenceClass` is a set of [`Arc<dyn PhysicalExpr>`]s that are known
/// to have the same value for all tuples in a relation. These are generated by
/// equality predicates (e.g. `a = b`), typically equi-join conditions and
Expand Down
16 changes: 3 additions & 13 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use super::class::concat_const_exprs;
use super::ordering::collapse_lex_ordering;
use crate::equivalence::class::const_exprs_contains;
use crate::equivalence::{
Expand Down Expand Up @@ -1539,19 +1540,8 @@ fn calculate_union_binary(
}

// First, calculate valid constants for the union. A quantity is constant
// after the union if it is constant in both sides.
let constants = lhs
.constants()
.iter()
.filter(|const_expr| const_exprs_contains(rhs.constants(), const_expr.expr()))
.map(|const_expr| {
// TODO: When both sides' constants are valid across partitions,
// the union's constant should also be valid if values are
// the same. However, we do not have the capability to
// check this yet.
ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false)
})
.collect();
// after the union if it is constant on one of the sides.
let constants = concat_const_exprs(lhs.constants(), rhs.constants());

// Next, calculate valid orderings for the union by searching for prefixes
// in both sides.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1220,10 +1220,10 @@ physical_plan
01)ProjectionExec: expr=[b@0 as b, c@1 as c, a@2 as a, a0@3 as a0]
02)--SortPreservingMergeExec: [d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], fetch=2
03)----UnionExec
04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false]
04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false]
05)--------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d]
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true
07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false]
Copy link
Contributor Author

@wiedld wiedld Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can actually add constants, as there is a flag, "across_partitions," that indicates whether the value is constant across all partitions or only in its corresponding partition

Made the change per suggestion (demonstration only, not final commits), and I'm not sure this is the proper fix. If I add the constants on the union's equivalence properties, there are other ramifications because:

In the example above, the sort orders [a@2 ASC NULLS LAST] and [a0@3 ASC NULLS LAST] are removed on non-constant projections. The EnforceSorting optimization adds (and pushes down) the SortExecs, but the change itself really occurs based upon the EquivalenceProperties's definition of a (non-constant) sort order. Since the UnionExec listed certain constants -- they are removed from the sort order.

I started hacking around this in the EnforceSorting, but it feels like the suggested change (adding to constants) may be actually changing the definition of what the constants are? 🤔 Am I heading in the right direction here?

07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false]
08)--------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d]
09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true

Expand Down
Loading