-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23500][SQL] Fix complex type simplification rules to apply to entire plan #20687
Conversation
Test build #87739 has finished for PR 20687 at commit
|
retest this please |
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.rules.Rule | |||
* push down operations into [[CreateNamedStructLike]]. | |||
*/ | |||
object SimplifyCreateStructOps extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we merge these 3 rules? then we only need to transform the plan once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for @cloud-fan 's advice.
cc @dongjoon-hyun Do you want to review this PR? |
Test build #87763 has finished for PR 20687 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you fix the JIRA number in PR title?
test("SPARK-23500: Simplify complex ops that aren't at the plan root") { | ||
val structRel = relation | ||
.select(GetStructField(CreateNamedStruct(Seq("att1", 'id)), 0, None) as "foo") | ||
.select('foo).analyze |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@henryr Could you update the test cases properly? Actually, this will not provide the test coverage of your PR properly because of CollapseProject
at line 40.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the pointer. I replaced the projection with an aggregation.
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.rules.Rule | |||
* push down operations into [[CreateNamedStructLike]]. | |||
*/ | |||
object SimplifyCreateStructOps extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for @cloud-fan 's advice.
Test build #87971 has finished for PR 20687 at commit
|
retest this please |
Test build #88024 has finished for PR 20687 at commit
|
This failing because of SPARK-23606, which seems unrelated (I haven't been able to trigger it in local builds, at least). |
Retest this please. |
Test build #88035 has finished for PR 20687 at commit
|
@@ -22,32 +22,24 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | |||
import org.apache.spark.sql.catalyst.rules.Rule | |||
|
|||
/** | |||
* push down operations into [[CreateNamedStructLike]]. | |||
* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and [[CreateMap]] expressions. | |||
*/ | |||
object SimplifyCreateStructOps extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SimplifyExtractValueOps
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, done.
|
||
test("SPARK-23500: Simplify complex ops that aren't at the plan root") { | ||
// If nullable attributes aren't used, the array and map test cases fail because array | ||
// and map indexing can return null so the output is marked nullable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? I think the optimization is still valid, we should show this in the test, instead of hiding it with a nullable attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optimization works either way, but in (for example) the map case, m1
is marked as nullable in the original plan because presumably GetMapValue(CreateMap(...))
can return null
if the key is not in the map.
So for the expected plan to compare the same as the original, it has to be reading a nullable attribute - otherwise the plans don't pass comparePlans
. I moved and reworded the comment to hopefully clarify this a bit.
There's an opportunity to fix this up again after the rule completes (since some attributes could be marked too conservatively as nullable). Do you think that's something we should pursue for this PR?
@dongjoon-hyun Have you finished the review? |
comparePlans(Optimizer execute structRel, structExpected) | ||
|
||
// If nullable attributes aren't used in the 'expected' plans, the array and map test | ||
// cases fail because array and map indexing can return null so the output attribute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This explains why the original plan(before optimize) marks its output as nullable, but I'm confused why the optimized plan still marks its output as nullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good question! I'm not too familiar with how nullability is marked and unmarked during planning. My understanding is roughly that the analyzer resolves all the plan's expressions and in doing so marks attributes as nullable or not. After that it's not clear that the optimizer revisits any of those nullability decisions. Is there an optimizer pass which should make nullability marking more precise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullable
is mostly calculated on demand, so we don't have rules to change the nullable
property. For this case, the expression is Alias(GetArrayItem(CreateArray(Attribute...)))
, which is nullable. After optimize, it becomes Alias(Attribute...)
and is not nullable(if that attribute is not nullable). So the nullable
is updated automatically.
I don't know why you hit this issue, please ping us if you can't figure it out, we can help to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that's plenty of information to get started - I'll dig into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I looked again at this briefly this morning. The issue is that it's the AttributeReference
in the top-level Aggregate
's groupingExpressions
that has inconsistent nullability.
The AttributeReference
in the original plan was originally created with nullable=true
, before optimization. So at that point it's kind of fixed unless the optimizer dereferences the attr reference and realises that the target is no longer nullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! Let's explain this in the test and fix it in a follow-up. We can just add a new rule to transform the plan and update the nullability
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thanks. I filed SPARK-23634 to fix this. Out of interest, why does AttributeReference
cache the nullability of its referent? Is it because comparison is too expensive to do if you have to follow a level of indirection to get to the original attribute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because AttributeReference
is not only used as a reference of an attribute from children, but also the new attributes produced by leaf nodes, which has to carry the nullable info. It's not ideal but it's too late to change now.
Test build #88056 has finished for PR 20687 at commit
|
@@ -22,32 +22,24 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | |||
import org.apache.spark.sql.catalyst.rules.Rule | |||
|
|||
/** | |||
* push down operations into [[CreateNamedStructLike]]. | |||
* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and [[CreateMap]] expressions. | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Could you fix the indentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
override def apply(plan: LogicalPlan): LogicalPlan = { | ||
plan.transformExpressionsUp { | ||
// push down field selection (array of structs) | ||
// Remove redundant array indexing. | ||
case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit.
case GetArrayStructFields(CreateArray(elems), field, ordinal, _, _) =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => | ||
// instead f selecting the field on the entire array, | ||
// select it from each member of the array. | ||
// pushing down the operation this way open other optimizations opportunities | ||
// (i.e. struct(...,x,...).x) | ||
CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name)))) | ||
// push down item selection. | ||
|
||
// Remove redundant map lookup. | ||
case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) => | ||
// instead of creating the array and then selecting one row, | ||
// remove array creation altgether. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
altgether
-> altogether
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
I didn't retrigger Jenkins due to the existing comment. |
…entire plan ## What changes were proposed in this pull request? Complex type simplification optimizer rules were not applied to the entire plan, just the expressions reachable from the root node. This patch fixes the rules to transform the entire plan. ## How was this patch tested? New unit test + sql / core tests.
*/ | ||
object SimplifyExtractValueOps extends Rule[LogicalPlan] { | ||
override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case p => | ||
p.transformExpressionsUp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun , is it safe to simplify it for Aggregate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for late response, @gatorsmile .
These are expression-level optimization rules. If the original expressions exists in SELECT
, GROUP BY
, and HAVING
, those are simplified in the same way together. Do you have any concerning cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aggregateExpressions are resolved from groupingExpressions using semanticEquals, while referring to names from input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expression-level optimizer simplifies both aggregateExpressions
and groupingExpressions
together. If the target expression exists at somewhere of both sides, the simplified expression also exists at the same locations of both sides. Given that, semanticEquals
will work for the updated expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about select struct(a, b).a from t group by struct(a, b)
? We may optimize it to select a from t group by struct(a, b)
, which is invalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right. I missed to consider that kind of cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since map
is not orderable, that happens for struct
and array
types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Ignore my previous comment here, which was mistaken).
Test build #88135 has finished for PR 20687 at commit
|
val mapExpected = relation | ||
.select('nullable_id as "m1") | ||
.groupBy($"m1")("1").analyze | ||
comparePlans(Optimizer execute mapRel, mapExpected) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@henryr .
Could you add more test cases mentioned today, for example, like the following? We need a test case for array
, too.
val structRel = relation.groupBy(
CreateNamedStruct(Seq("att1", 'nullable_id)))(
GetStructField(CreateNamedStruct(Seq("att1", 'nullable_id)), 0, None)).analyze
comparePlans(Optimizer execute structRel, structRel)
* Simplify redundant [[CreateNamedStructLike]], [[CreateArray]] and [[CreateMap]] expressions. | ||
*/ | ||
object SimplifyExtractValueOps extends Rule[LogicalPlan] { | ||
override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case p => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@henryr . You can change like the following in order to avoid Aggregate
.
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case a: Aggregate => a
case p => p.transformExpressionsUp {
Test build #88280 has finished for PR 20687 at commit
|
@@ -19,57 +19,47 @@ package org.apache.spark.sql.catalyst.optimizer | |||
|
|||
import org.apache.spark.sql.catalyst.expressions._ | |||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | |||
import org.apache.spark.sql.catalyst.plans.logical.Aggregate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be before line 21 in alphabetical order.
You can check this locally with dev/scalastyle
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
.select('nullable_id as "m1") | ||
.groupBy($"m1")("1").analyze | ||
comparePlans(Optimizer execute mapRel, mapExpected) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the current test case become too long. For the following negative cases, let's split to another test case. Maybe, with the following title?
test("SPARK-23500: Aggregation expressions should not be simplified.")
The fix looks good to me, but the test coverage is not enough. |
@henryr Thanks for your great work! |
@gatorsmile thank you for the reviews! Are there specific test cases you'd like to see? I've checked correlated and uncorrelated subqueries, various flavours of join, aggregates with HAVING clauses, nested compound types, and so on. |
@henryr Please try to add the test cases that matter in your opinion. I will also submit a follow-up PR to add more test cases after this PR is merged. |
@gatorsmile ok, I think the coverage right now is a reasonable start - the other test cases I can think of would act more like they're exercising the expression-walking code, not the actual simplification. Look forward to collaborating on the follow-up PR. |
Test build #88391 has finished for PR 20687 at commit
|
Thanks! Merged to master. |
Will submit a separate PR for tests only. |
… apply to entire plan ## What changes were proposed in this pull request? This PR is to improve the test coverage of the original PR #20687 ## How was this patch tested? N/A Author: gatorsmile <[email protected]> Closes #20911 from gatorsmile/addTests.
… apply to entire plan ## What changes were proposed in this pull request? This PR is to improve the test coverage of the original PR apache#20687 ## How was this patch tested? N/A Author: gatorsmile <[email protected]> Closes apache#20911 from gatorsmile/addTests.
What changes were proposed in this pull request?
Complex type simplification optimizer rules were not applied to the
entire plan, just the expressions reachable from the root node. This
patch fixes the rules to transform the entire plan.
How was this patch tested?
New unit test + ran sql / core tests.