-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all aliases into account #37525
Conversation
@@ -681,7 +681,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit | |||
df.createTempView("df") | |||
|
|||
val sqlText = "EXPLAIN CODEGEN SELECT key, MAX(value) FROM df GROUP BY key" | |||
val expectedCodegenText = "Found 2 WholeStageCodegen subtrees." | |||
val expectedCodegenText = "Found 1 WholeStageCodegen subtrees." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain codegen output (plan) changed from:
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:348(0.53% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[key#xL], functions=[partial_max(value#xL)], output=[key#xL, max#xL])
+- *(1) Project [id#xL AS key#xL, id#xL AS value#xL]
+- *(1) Range (0, 5, step=1, splits=2)
== Subtree 2 / 2 (maxMethodCodeSize:252; maxConstantPoolSize:214(0.33% used); numInnerClasses:0) ==
*(2) HashAggregate(keys=[key#xL], functions=[max(value#xL)], output=[key#xL, max(value)#xL])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(key#xL, 5), ENSURE_REQUIREMENTS, [plan_id=55]
+- *(1) HashAggregate(keys=[key#xL], functions=[partial_max(value#xL)], output=[key#xL, max#xL])
+- *(1) Project [id#xL AS key#xL, id#xL AS value#xL]
+- *(1) Range (0, 5, step=1, splits=2)
to:
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:308; maxConstantPoolSize:374(0.57% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[key#xL], functions=[max(value#xL)], output=[key#xL, max(value)#xL])
+- *(1) HashAggregate(keys=[key#xL], functions=[partial_max(value#xL)], output=[key#xL, max#xL])
+- *(1) Project [id#xL AS key#xL, id#xL AS value#xL]
+- *(1) Range (0, 5, step=1, splits=2)
@@ -339,12 +339,12 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite { | |||
// ShuffleQueryStage 0 | |||
// ShuffleQueryStage 2 | |||
// ReusedQueryStage 0 | |||
val grouped = df.groupBy("key").agg(max("value").as("value")) | |||
val grouped = df.groupBy((col("key") + 1).as("key")).agg(max("value").as("value")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to modify the test because the fix modified the explain plan of the original query from:
Union
:- *(5) HashAggregate(keys=[_groupingexpression#79L], functions=[max(value#38L)], output=[(key + 1)#44L, max(value)#45L])
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 3
: +- Exchange hashpartitioning(_groupingexpression#79L, 5), ENSURE_REQUIREMENTS, [plan_id=693]
: +- *(3) HashAggregate(keys=[_groupingexpression#79L], functions=[partial_max(value#38L)], output=[_groupingexpression#79L, max#62L])
: +- *(3) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#79L])
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(key#12L, 5), ENSURE_REQUIREMENTS, [plan_id=623]
: +- *(1) HashAggregate(keys=[key#12L], functions=[partial_max(value#13L)], output=[key#12L, max#64L])
: +- *(1) Project [id#10L AS key#12L, id#10L AS value#13L]
: +- *(1) Range (0, 6, step=1, splits=10)
+- *(6) HashAggregate(keys=[_groupingexpression#80L], functions=[max(value#38L)], output=[(key + 2)#51L, max(value)#52L])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 4
+- Exchange hashpartitioning(_groupingexpression#80L, 5), ENSURE_REQUIREMENTS, [plan_id=719]
+- *(4) HashAggregate(keys=[_groupingexpression#80L], functions=[partial_max(value#38L)], output=[_groupingexpression#80L, max#66L])
+- *(4) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#80L])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 2
+- ReusedExchange [key#12L, max#64L], Exchange hashpartitioning(key#12L, 5), ENSURE_REQUIREMENTS, [plan_id=623]
to (1 less exchange):
Union
:- *(3) HashAggregate(keys=[_groupingexpression#75L], functions=[max(value#38L)], output=[(key + 1)#44L, max(value)#45L])
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(_groupingexpression#75L, 5), ENSURE_REQUIREMENTS, [plan_id=514]
: +- *(1) HashAggregate(keys=[_groupingexpression#75L], functions=[partial_max(value#38L)], output=[_groupingexpression#75L, max#62L])
: +- *(1) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#75L])
: +- *(1) HashAggregate(keys=[key#12L], functions=[partial_max(value#13L)], output=[key#12L, max#64L])
: +- *(1) Project [id#10L AS key#12L, id#10L AS value#13L]
: +- *(1) Range (0, 6, step=1, splits=10)
+- *(4) HashAggregate(keys=[_groupingexpression#76L], functions=[max(value#38L)], output=[(key + 2)#51L, max(value)#52L])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(_groupingexpression#76L, 5), ENSURE_REQUIREMENTS, [plan_id=532]
+- *(2) HashAggregate(keys=[_groupingexpression#76L], functions=[partial_max(value#38L)], output=[_groupingexpression#76L, max#66L])
+- *(2) HashAggregate(keys=[key#12L], functions=[max(value#13L)], output=[value#38L, _groupingexpression#76L])
+- *(2) HashAggregate(keys=[key#12L], functions=[partial_max(value#13L)], output=[key#12L, max#64L])
+- *(2) Project [id#55L AS key#12L, id#55L AS value#13L]
+- *(2) Range (0, 6, step=1, splits=10)
and so the query didn't match the test case 2
description.
cc @cloud-fan |
sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
Outdated
Show resolved
Hide resolved
eb3cd45
to
e044ff8
Compare
@cloud-fan, @imback82, can you please help to review this PR? |
04c77d6
to
730e785
Compare
730e785
to
ddf16c9
Compare
ddf16c9
to
7cf5260
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
Outdated
Show resolved
Hide resolved
7cf5260
to
1fe49c3
Compare
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) | ||
|
||
def pruneCandidate(candidate: Expression): Option[Expression] = { | ||
expr.multiTransform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, @ulysses-you I've updated this PR. Now it is based on multiTransform
and contains changes from both this PR and #39556 (see the description).
normalizeExpression()
becomes as simple as this with multiTransform
.
Please note that currently pruneFunc
is used only for "after transformation filtering", but, as multiTransform
does the mapping in "one run" (unlike the removed code which runs a transform
for each alias) so it is much more efficient than the removed version if we have high number of aliases.
Some early pruning would also be possible using multiTransform
, I will show you that version a bit later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to my usage. The multiTransform
should at least support 4 cases of pruning:
- the max limit size of returned result
- prune the result whose references is not subset of output
- prune intermediate result if the alias map does not contain any other sub-expression
- prune sub-expression, e.g.
PartitionCollection(a, b)
->PartitionCollection(a)
if b is not subset of output
If all this requirements can be matched, I think it's good to switch to multi-transofrm.
4c0a1ea
to
98b7a15
Compare
@ulysses-you, @cloud-fan, I've rebased this PR on top of
Let me know your thougths. |
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
Outdated
Show resolved
Hide resolved
projectExpression(e) | ||
.filter(e => partitioningSet.add(e.canonicalized)) | ||
.take(aliasCandidateLimit) | ||
.asInstanceOf[Stream[Partitioning]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to cast to Stream
? It will be pattern matched by Seq(...)
immediately which will materialize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cast is required to avoid a compile error as projectExpression
returns Stream[Expression]
but the flatMap
requires Seq[Partitioning]
.
We could use .asInstanceOf[Seq[Partitioning]]
here but I'm not sure it makes any difference.
if (requiredOrdering.length > outputOrdering.length) { | ||
false | ||
} else { | ||
requiredOrdering.zip(outputOrdering).forall { | ||
case (requiredOrder, outputOrder) => requiredOrder.semanticEquals(outputOrder) | ||
case (requiredOrder, outputOrder) => | ||
// Follow `SortOrder.satisfies` that respects `SortOrder.sameOrderExpressions` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is not needed, as we are not following SortOrder.satisfies
, but using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed, fixed in 733ecb5
" It can preserve the output partitioning and ordering." + | ||
" Negative value means disable this optimization.") | ||
.internal() | ||
.version("3.4.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR targets master, which is 3.5.0
. Is this going to be merged into branch-3.4
, which is feature-freeze? If not, this line should be adjusted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to merge it to 3.4 as it fixes a bug in planned write, which is a new feature in 3.4.
val partitioningSet = mutable.Set.empty[Expression] | ||
projectExpression(e) | ||
.filter(e => partitioningSet.add(e.canonicalized)) | ||
.take(aliasCandidateLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala 2.13 allows to simplify this. Its a shame...
val partitioningSet = mutable.Set.empty[Expression] | |
projectExpression(e) | |
.filter(e => partitioningSet.add(e.canonicalized)) | |
.take(aliasCandidateLimit) | |
projectExpression(e) | |
.distinctBy(_.canonicalized) | |
.take(aliasCandidateLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I think we still need to support scala 2.12 for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, that's the shame bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once again, I can confirm that for the example in #38356 master writes
WriteFiles
+- *(1) Sort [p#19 ASC NULLS FIRST], false, 0
+- *(1) Project [id#10, sort_col#11, empty2null(p#12) AS p#19]
+- ShuffleQueryStage 0
+- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=18]
+- LocalTableScan [id#10, sort_col#11, p#12]
while this fix writes
WriteFiles
+- *(1) Project [id#10, sort_col#11, empty2null(p#12) AS p#19]
+- *(1) Sort [p#12 ASC NULLS FIRST, sort_col#11 ASC NULLS FIRST], false, 0
+- ShuffleQueryStage 0
+- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=18]
+- LocalTableScan [id#10, sort_col#11, p#12]
LGTM!
@peter-toth can you retrigger the tests? The pyspark failures may be flaky. |
case _ => | ||
} | ||
outputExpressions.foreach { | ||
case a: Attribute if aliases.contains(a.canonicalized) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this do ? do you mean !aliases.contains(a.canonicalized)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have c, c AS a
projection then we need to add both the original c
attribute and a
to the alternatives of c
. But we don't need to add an attribute if it isn't aliased.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the behavior of c as a
? this code seems to return both c
and a
. I think the right way should be
if AttributSet(outputExpressions).contains(a) => // add a to buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case doesn't match c AS a
. This case makes sure that if c
-> a
has been added to the map by the previous forach then c
-> c
should be added too.
I think your code would add all attributes, but that is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why c -> c
should be added ? the outputExpression only contains c as a
, shall we only return c -> a
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I see it. how about this case: c1, c2 as x
? We also should add c1 into aliasMap right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. If c1
is not aliased otherwise then we don't need to add it to the map. If the map doesn't contain any alias for an expression then the transformation does nothing with that c1
attribute, just leaves it in the expression tree as it is...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the aliasMap is not empty due to c2 as x. for this case, how can we preserve c1 if c1 does not add into aliasMap ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see it, nvm. @peter-toth Thank you for the patience !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Np, thanks for reviewing this PR @ulysses-you!
val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]() | ||
outputExpressions.reverse.foreach { | ||
case a @ Alias(child, _) => | ||
val buffer = aliases.getOrElseUpdate(strip(child.canonicalized), mutable.ArrayBuffer.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer strip(child).canonicalized
. I have not seen other code places that match a canonicalized expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks. I will change it today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 1f1f093
orderingExpressions.flatMap { sortOrder => | ||
val orderingSet = mutable.Set.empty[Expression] | ||
val sameOrderings = sortOrder.children.toStream | ||
.flatMap(projectExpression) | ||
.filter(e => orderingSet.add(e.canonicalized)) | ||
.take(aliasCandidateLimit) | ||
if (sameOrderings.nonEmpty) { | ||
Some(sortOrder.copy(child = sameOrderings.head, | ||
sameOrderExpressions = sameOrderings.tail)) | ||
} else { | ||
None | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it correct ? if orderingExpressions
is c1, c2
and aliasMap is c2 as x
, then the outputOrdering should be c1, x
or empty rather than x
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Partitioning is fine as the entire HashPartitioning
is one expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also add tests for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I get this. If we have 2 orderings in orderingExpressions
(SortOrder (c1), SortOrder(c2)
) and projection is c2 as x
then why we don't expect SortOrder(x)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I might see the issue here... Let me fix it today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If data is sorted by a, b
, we can't say the data is sorted by b
, but we can say it's sorted by a
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it already thanks! I will submit a fix today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the fix and a new test: 1f1f093
thanks, merging to master/3.4 (as it fixes a bug in planned write)! |
…and AliasAwareQueryOutputOrdering to take all aliases into account ### What changes were proposed in this pull request? Currently `AliasAwareOutputPartitioning` and `AliasAwareQueryOutputOrdering` takes only the last alias by aliased expressions into account. We could avoid some extra shuffles and sorts with better alias handling. ### Why are the changes needed? Performance improvement and this also fix the issue in #39475. ### Does this PR introduce _any_ user-facing change? Yes, this PR fixes the issue in #39475. ### How was this patch tested? Added new UT. Closes #37525 from peter-toth/SPARK-40086-fix-aliasawareoutputexpression. Lead-authored-by: Peter Toth <[email protected]> Co-authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 6341b06) Signed-off-by: Wenchen Fan <[email protected]>
Thanks for the review! |
…itioning ### What changes were proposed in this pull request? This is a follow-up of #37525 . When the project list has aliases, we go to the `projectExpression` branch which filters away invalid partitioning/ordering that reference non-existing attributes in the current plan node. However, this filtering is missing when the project list has no alias, where we directly return the child partitioning/ordering. This PR fixes it. ### Why are the changes needed? to make sure we always return valid output partitioning/ordering. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #40137 from cloud-fan/alias. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…itioning ### What changes were proposed in this pull request? This is a follow-up of #37525 . When the project list has aliases, we go to the `projectExpression` branch which filters away invalid partitioning/ordering that reference non-existing attributes in the current plan node. However, this filtering is missing when the project list has no alias, where we directly return the child partitioning/ordering. This PR fixes it. ### Why are the changes needed? to make sure we always return valid output partitioning/ordering. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #40137 from cloud-fan/alias. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 72922ad) Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? After #37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query: ``` select (select sum(id) from t1) ``` fails with: ``` 09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.NullPointerException at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60) at scala.runtime.Statics.anyHash(Statics.java:122) ... at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249) at scala.runtime.Statics.anyHash(Statics.java:122) at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416) at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416) at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44) at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149) at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148) at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44) at scala.collection.mutable.HashTable.init(HashTable.scala:110) at scala.collection.mutable.HashTable.init$(HashTable.scala:89) at scala.collection.mutable.HashMap.init(HashMap.scala:44) at scala.collection.mutable.HashMap.readObject(HashMap.scala:195) ... at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` when DSv2 is enabled. This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances. But if we dig deeper we realize that the NPE orrurs since #37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`. Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE). Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors. A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions in the map doesn't do any harm. ### Why are the changes needed? To fix regression introduced with #37525. ### Does this PR introduce _any_ user-facing change? Yes, the query works again. ### How was this patch tested? Added new UT. Closes #40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? After #37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query: ``` select (select sum(id) from t1) ``` fails with: ``` 09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.NullPointerException at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60) at scala.runtime.Statics.anyHash(Statics.java:122) ... at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249) at scala.runtime.Statics.anyHash(Statics.java:122) at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416) at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416) at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44) at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149) at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148) at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44) at scala.collection.mutable.HashTable.init(HashTable.scala:110) at scala.collection.mutable.HashTable.init$(HashTable.scala:89) at scala.collection.mutable.HashMap.init(HashMap.scala:44) at scala.collection.mutable.HashMap.readObject(HashMap.scala:195) ... at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` when DSv2 is enabled. This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances. But if we dig deeper we realize that the NPE orrurs since #37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`. Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE). Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors. A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions in the map doesn't do any harm. ### Why are the changes needed? To fix regression introduced with #37525. ### Does this PR introduce _any_ user-facing change? Yes, the query works again. ### How was this patch tested? Added new UT. Closes #40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 93d5816) Signed-off-by: Wenchen Fan <[email protected]>
…and AliasAwareQueryOutputOrdering to take all aliases into account ### What changes were proposed in this pull request? Currently `AliasAwareOutputPartitioning` and `AliasAwareQueryOutputOrdering` takes only the last alias by aliased expressions into account. We could avoid some extra shuffles and sorts with better alias handling. ### Why are the changes needed? Performance improvement and this also fix the issue in apache#39475. ### Does this PR introduce _any_ user-facing change? Yes, this PR fixes the issue in apache#39475. ### How was this patch tested? Added new UT. Closes apache#37525 from peter-toth/SPARK-40086-fix-aliasawareoutputexpression. Lead-authored-by: Peter Toth <[email protected]> Co-authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 6341b06) Signed-off-by: Wenchen Fan <[email protected]>
…itioning ### What changes were proposed in this pull request? This is a follow-up of apache#37525 . When the project list has aliases, we go to the `projectExpression` branch which filters away invalid partitioning/ordering that reference non-existing attributes in the current plan node. However, this filtering is missing when the project list has no alias, where we directly return the child partitioning/ordering. This PR fixes it. ### Why are the changes needed? to make sure we always return valid output partitioning/ordering. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes apache#40137 from cloud-fan/alias. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 72922ad) Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? After apache#37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query: ``` select (select sum(id) from t1) ``` fails with: ``` 09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.NullPointerException at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60) at scala.runtime.Statics.anyHash(Statics.java:122) ... at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249) at scala.runtime.Statics.anyHash(Statics.java:122) at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416) at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416) at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44) at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149) at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148) at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44) at scala.collection.mutable.HashTable.init(HashTable.scala:110) at scala.collection.mutable.HashTable.init$(HashTable.scala:89) at scala.collection.mutable.HashMap.init(HashMap.scala:44) at scala.collection.mutable.HashMap.readObject(HashMap.scala:195) ... at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` when DSv2 is enabled. This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances. But if we dig deeper we realize that the NPE orrurs since apache#37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`. Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE). Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors. A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions in the map doesn't do any harm. ### Why are the changes needed? To fix regression introduced with apache#37525. ### Does this PR introduce _any_ user-facing change? Yes, the query works again. ### How was this patch tested? Added new UT. Closes apache#40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 93d5816) Signed-off-by: Wenchen Fan <[email protected]>
…ingUnaryExecNode ### What changes were proposed in this pull request? This is a followup of #37525 . When expanding the output partitioning/ordering with aliases, we have a threshold to avoid exponential explosion. However, we missed to apply this threshold in one place. This PR fixes it. ### Why are the changes needed? to avoid OOM ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #44614 from cloud-fan/oom. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…ingUnaryExecNode ### What changes were proposed in this pull request? This is a followup of #37525 . When expanding the output partitioning/ordering with aliases, we have a threshold to avoid exponential explosion. However, we missed to apply this threshold in one place. This PR fixes it. ### Why are the changes needed? to avoid OOM ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #44614 from cloud-fan/oom. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit f8115da) Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
Currently
AliasAwareOutputPartitioning
andAliasAwareQueryOutputOrdering
takes only the last alias by aliased expressions into account. We could avoid some extra shuffles and sorts with better alias handling.
Why are the changes needed?
Performance improvement and this also fix the issue in #39475.
Does this PR introduce any user-facing change?
Yes, this PR fixes the issue in #39475.
How was this patch tested?
Added new UT.