Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SanityCheckPlan should compare UnionExec inputs to requirements for output (parent). #12414

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 106 additions & 27 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1524,6 +1524,102 @@ impl Hash for ExprWrapper {
}
}

/// Take sort orderings for unioned sides of equal length, and return the unioned ordering.
///
/// Example:
/// child1 = order by a0, b, c
/// child2 = order by a, b, c
/// => union's joint order is a0, a, b, c.
fn calculate_joint_ordering(
lhs: &EquivalenceProperties,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't actually fully understand why do you need such a function. I think its logic is not correct.
SCHEMA: a-b-c-a0
CHILD1: (5,15,25,35),(4,15,25,36),(3,15,25,37)
CHILD2: (5,15,25,35),(6,15,25,34),(7,15,25,33)
How could we deduce unioned output is (a0, a, b, c)?

rhs: &EquivalenceProperties,
) -> LexOrdering {
let mut union_ordering = vec![];
for ordering in lhs
.normalized_oeq_class()
.orderings
.iter()
.chain(rhs.normalized_oeq_class().orderings.iter())
{
if union_ordering.is_empty() {
union_ordering = ordering.clone();
continue;
}

if !union_ordering.len().eq(&ordering.len()) {
break;
}

let mut unioned = union_ordering.into_iter().peekable();
let mut curr = ordering.iter().peekable();
let mut new_union = vec![];
loop {
match (curr.next(), unioned.next()) {
(None, None) => break,
(None, Some(u)) => {
new_union.push(u.clone());
continue;
}
(Some(c), None) => {
new_union.push(c.clone());
continue;
}
(Some(c), Some(u)) => {
if c.eq(&u) {
new_union.push(c.clone());
continue;
} else if c.expr.eq(&u.expr) {
// options are different => negates each other
continue;
} else {
new_union.push(u.clone());
new_union.push(c.clone());
continue;
}
}
}
}
union_ordering = new_union;
}
collapse_lex_ordering(union_ordering)
}

/// Take sort orderings for unioned sides return the shorten, novel sort order.
///
/// Example:
/// child1 = order by a, b
/// child2 = order by a1, b1, c1
/// => union's prefixed order is a, b.
fn calculate_prefix_ordering(
lhs: &EquivalenceProperties,
rhs: &EquivalenceProperties,
) -> Vec<LexOrdering> {
// Calculate valid orderings for the union by searching for prefixes
// in both sides.
let mut orderings = vec![];
for mut ordering in lhs.normalized_oeq_class().orderings {
// Progressively shorten the ordering to search for a satisfied prefix:
while !rhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
for mut ordering in rhs.normalized_oeq_class().orderings {
// Progressively shorten the ordering to search for a satisfied prefix:
while !lhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
orderings
}

/// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties`
/// of `lhs` and `rhs` according to the schema of `lhs`.
fn calculate_union_binary(
Expand Down Expand Up @@ -1553,32 +1649,15 @@ fn calculate_union_binary(
})
.collect();

// Next, calculate valid orderings for the union by searching for prefixes
// in both sides.
let mut orderings = vec![];
for mut ordering in lhs.normalized_oeq_class().orderings {
// Progressively shorten the ordering to search for a satisfied prefix:
while !rhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
for mut ordering in rhs.normalized_oeq_class().orderings {
// Progressively shorten the ordering to search for a satisfied prefix:
while !lhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
// Create a unioned ordering.
let mut orderings = calculate_prefix_ordering(&lhs, &rhs);
let union_ordering = calculate_joint_ordering(&lhs, &rhs);
orderings.push(union_ordering);

let mut eq_properties = EquivalenceProperties::new(lhs.schema);
eq_properties.constants = constants;
eq_properties.add_new_orderings(orderings);

Ok(eq_properties)
}

Expand Down Expand Up @@ -2645,8 +2724,8 @@ mod tests {
Arc::clone(&schema3),
),
],
// Expected
vec![vec!["a", "b"]],
// Expected: union sort orders
vec![vec!["a", "b", "c"]],
),
// --------- TEST CASE 2 ----------
(
Expand Down Expand Up @@ -2720,8 +2799,8 @@ mod tests {
Arc::clone(&schema3),
),
],
// Expected
vec![],
// Expected: union sort orders
vec![vec!["a", "b", "c"]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these test changes are not correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- these changes make the expected output incorrect

),
// --------- TEST CASE 5 ----------
(
Expand Down
44 changes: 44 additions & 0 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1196,3 +1196,47 @@ physical_plan
02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true


# Test: inputs into union with different orderings
query TT
explain select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1
union all
select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2
order by d, c, a, a0, b
limit 2;
----
logical_plan
01)Projection: t1.b, t1.c, t1.a, t1.a0
02)--Sort: t1.d ASC NULLS LAST, t1.c ASC NULLS LAST, t1.a ASC NULLS LAST, t1.a0 ASC NULLS LAST, t1.b ASC NULLS LAST, fetch=2
03)----Union
04)------SubqueryAlias: t1
05)--------Projection: ordered_table.b, ordered_table.c, ordered_table.a, Int32(NULL) AS a0, ordered_table.d
06)----------TableScan: ordered_table projection=[a, b, c, d]
07)------SubqueryAlias: t2
08)--------Projection: ordered_table.b, ordered_table.c, Int32(NULL) AS a, ordered_table.a0, ordered_table.d
09)----------TableScan: ordered_table projection=[a0, b, c, d]
physical_plan
01)ProjectionExec: expr=[b@0 as b, c@1 as c, a@2 as a, a0@3 as a0]
02)--SortPreservingMergeExec: [d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], fetch=2
03)----UnionExec
04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false]
05)--------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d]
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true
07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false]
08)--------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d]
09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true

# Test: run the query from above
query IIII
select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1
union all
select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2
order by d, c, a, a0, b
limit 2;
----
0 0 0 NULL
0 0 NULL 1

statement ok
drop table ordered_table;
Loading