-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all aliases into account #37525
Changes from 12 commits
1f5341e
5fe4fa6
b693b31
5e8b084
7a57a29
0138ad4
cf75540
3f84edd
da1d48a
7747045
e6c395e
733ecb5
1f1f093
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.plans | ||
|
||
import scala.collection.mutable | ||
|
||
import org.apache.spark.sql.catalyst.SQLConfHelper | ||
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder} | ||
import org.apache.spark.sql.internal.SQLConf | ||
|
||
/** | ||
* A trait that provides functionality to handle aliases in the `outputExpressions`. | ||
*/ | ||
trait AliasAwareOutputExpression extends SQLConfHelper { | ||
protected val aliasCandidateLimit = conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT) | ||
protected def outputExpressions: Seq[NamedExpression] | ||
/** | ||
* This method can be used to strip expression which does not affect the result, for example: | ||
* strip the expression which is ordering agnostic for output ordering. | ||
*/ | ||
protected def strip(expr: Expression): Expression = expr | ||
|
||
// Build an `Expression` -> `Attribute` alias map. | ||
// There can be multiple alias defined for the same expressions but it doesn't make sense to store | ||
// more than `aliasCandidateLimit` attributes for an expression. In those cases the old logic | ||
// handled only the last alias so we need to make sure that we give precedence to that. | ||
// If the `outputExpressions` contain simple attributes we need to add those too to the map. | ||
private lazy val aliasMap = { | ||
val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]() | ||
outputExpressions.reverse.foreach { | ||
case a @ Alias(child, _) => | ||
val buffer = aliases.getOrElseUpdate(strip(child.canonicalized), mutable.ArrayBuffer.empty) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, thanks. I will change it today. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in 1f1f093 |
||
if (buffer.size < aliasCandidateLimit) { | ||
buffer += a.toAttribute | ||
} | ||
case _ => | ||
} | ||
outputExpressions.foreach { | ||
case a: Attribute if aliases.contains(a.canonicalized) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this do ? do you mean There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the behavior of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This case doesn't match I think your code would add all attributes, but that is not needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, I see it. how about this case: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But the aliasMap is not empty due to c2 as x. for this case, how can we preserve c1 if c1 does not add into aliasMap ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh I see it, nvm. @peter-toth Thank you for the patience ! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Np, thanks for reviewing this PR @ulysses-you! |
||
val buffer = aliases(a.canonicalized) | ||
if (buffer.size < aliasCandidateLimit) { | ||
buffer += a | ||
} | ||
case _ => | ||
} | ||
aliases | ||
} | ||
|
||
protected def hasAlias: Boolean = aliasMap.nonEmpty | ||
|
||
/** | ||
* Return a stream of expressions in which the original expression is projected with `aliasMap`. | ||
*/ | ||
protected def projectExpression(expr: Expression): Stream[Expression] = { | ||
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) | ||
expr.multiTransformDown { | ||
// Mapping with aliases | ||
case e: Expression if aliasMap.contains(e.canonicalized) => | ||
aliasMap(e.canonicalized).toSeq ++ (if (e.containsChild.nonEmpty) Seq(e) else Seq.empty) | ||
|
||
// Prune if we encounter an attribute that we can't map and it is not in output set. | ||
// This prune will go up to the closest `multiTransformDown()` call and returns `Stream.empty` | ||
// there. | ||
case a: Attribute if !outputSet.contains(a) => Seq.empty | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that | ||
* satisfies ordering requirements. | ||
*/ | ||
trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]] | ||
extends AliasAwareOutputExpression { self: QueryPlan[T] => | ||
protected def orderingExpressions: Seq[SortOrder] | ||
|
||
override protected def strip(expr: Expression): Expression = expr match { | ||
case e: Empty2Null => strip(e.child) | ||
case _ => expr | ||
} | ||
|
||
override final def outputOrdering: Seq[SortOrder] = { | ||
if (hasAlias) { | ||
orderingExpressions.flatMap { sortOrder => | ||
val orderingSet = mutable.Set.empty[Expression] | ||
val sameOrderings = sortOrder.children.toStream | ||
.flatMap(projectExpression) | ||
.filter(e => orderingSet.add(e.canonicalized)) | ||
.take(aliasCandidateLimit) | ||
if (sameOrderings.nonEmpty) { | ||
Some(sortOrder.copy(child = sameOrderings.head, | ||
sameOrderExpressions = sameOrderings.tail)) | ||
} else { | ||
None | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it correct ? if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! Partitioning is fine as the entire There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's also add tests for it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I get this. If we have 2 orderings in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I might see the issue here... Let me fix it today. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If data is sorted by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it already thanks! I will submit a fix today. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the fix and a new test: 1f1f093 |
||
} | ||
} else { | ||
orderingExpressions | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -435,6 +435,16 @@ object SQLConf { | |
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val EXPRESSION_PROJECTION_CANDIDATE_LIMIT = | ||
buildConf("spark.sql.optimizer.expressionProjectionCandidateLimit") | ||
.doc("The maximum number of the candidate of output expressions whose alias are replaced." + | ||
" It can preserve the output partitioning and ordering." + | ||
" Negative value means disable this optimization.") | ||
.internal() | ||
.version("3.4.0") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR targets master, which is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to merge it to 3.4 as it fixes a bug in planned write, which is a new feature in 3.4. |
||
.intConf | ||
peter-toth marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.createWithDefault(100) | ||
|
||
val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed") | ||
.doc("When set to true Spark SQL will automatically select a compression codec for each " + | ||
"column based on statistics of the data.") | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -16,52 +16,42 @@ | |||||||||||||||
*/ | ||||||||||||||||
package org.apache.spark.sql.execution | ||||||||||||||||
|
||||||||||||||||
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, SortOrder} | ||||||||||||||||
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} | ||||||||||||||||
import scala.collection.mutable | ||||||||||||||||
|
||||||||||||||||
/** | ||||||||||||||||
* A trait that provides functionality to handle aliases in the `outputExpressions`. | ||||||||||||||||
*/ | ||||||||||||||||
trait AliasAwareOutputExpression extends UnaryExecNode { | ||||||||||||||||
protected def outputExpressions: Seq[NamedExpression] | ||||||||||||||||
|
||||||||||||||||
private lazy val aliasMap = outputExpressions.collect { | ||||||||||||||||
case a @ Alias(child, _) => child.canonicalized -> a.toAttribute | ||||||||||||||||
}.toMap | ||||||||||||||||
|
||||||||||||||||
protected def hasAlias: Boolean = aliasMap.nonEmpty | ||||||||||||||||
|
||||||||||||||||
protected def normalizeExpression(exp: Expression): Expression = { | ||||||||||||||||
exp.transformDown { | ||||||||||||||||
case e: Expression => aliasMap.getOrElse(e.canonicalized, e) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
import org.apache.spark.sql.catalyst.expressions.Expression | ||||||||||||||||
import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering} | ||||||||||||||||
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning} | ||||||||||||||||
|
||||||||||||||||
/** | ||||||||||||||||
* A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that | ||||||||||||||||
* satisfies distribution requirements. | ||||||||||||||||
*/ | ||||||||||||||||
trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression { | ||||||||||||||||
trait PartitioningPreservingUnaryExecNode extends UnaryExecNode | ||||||||||||||||
with AliasAwareOutputExpression { | ||||||||||||||||
final override def outputPartitioning: Partitioning = { | ||||||||||||||||
val normalizedOutputPartitioning = if (hasAlias) { | ||||||||||||||||
child.outputPartitioning match { | ||||||||||||||||
if (hasAlias) { | ||||||||||||||||
flattenPartitioning(child.outputPartitioning).flatMap { | ||||||||||||||||
case e: Expression => | ||||||||||||||||
normalizeExpression(e).asInstanceOf[Partitioning] | ||||||||||||||||
case other => other | ||||||||||||||||
// We need unique partitionings but if the input partitioning is | ||||||||||||||||
// `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after | ||||||||||||||||
// the projection we have 4 partitionings: | ||||||||||||||||
// `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`, | ||||||||||||||||
// `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but | ||||||||||||||||
// `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`. | ||||||||||||||||
val partitioningSet = mutable.Set.empty[Expression] | ||||||||||||||||
projectExpression(e) | ||||||||||||||||
.filter(e => partitioningSet.add(e.canonicalized)) | ||||||||||||||||
.take(aliasCandidateLimit) | ||||||||||||||||
Comment on lines
+41
to
+44
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scala 2.13 allows to simplify this. Its a shame...
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm I think we still need to support scala 2.12 for now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, that's the shame bit |
||||||||||||||||
.asInstanceOf[Stream[Partitioning]] | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to cast to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This cast is required to avoid a compile error as |
||||||||||||||||
case o => Seq(o) | ||||||||||||||||
} match { | ||||||||||||||||
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) | ||||||||||||||||
case Seq(p) => p | ||||||||||||||||
case ps => PartitioningCollection(ps) | ||||||||||||||||
} | ||||||||||||||||
} else { | ||||||||||||||||
child.outputPartitioning | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
flattenPartitioning(normalizedOutputPartitioning).filter { | ||||||||||||||||
case hashPartitioning: HashPartitioning => hashPartitioning.references.subsetOf(outputSet) | ||||||||||||||||
case _ => true | ||||||||||||||||
} match { | ||||||||||||||||
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) | ||||||||||||||||
case Seq(singlePartitioning) => singlePartitioning | ||||||||||||||||
case seqWithMultiplePartitionings => PartitioningCollection(seqWithMultiplePartitionings) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
private def flattenPartitioning(partitioning: Partitioning): Seq[Partitioning] = { | ||||||||||||||||
|
@@ -74,18 +64,5 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression { | |||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
/** | ||||||||||||||||
* A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that | ||||||||||||||||
* satisfies ordering requirements. | ||||||||||||||||
*/ | ||||||||||||||||
trait AliasAwareOutputOrdering extends AliasAwareOutputExpression { | ||||||||||||||||
protected def orderingExpressions: Seq[SortOrder] | ||||||||||||||||
|
||||||||||||||||
final override def outputOrdering: Seq[SortOrder] = { | ||||||||||||||||
if (hasAlias) { | ||||||||||||||||
orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder]) | ||||||||||||||||
} else { | ||||||||||||||||
orderingExpressions | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
trait OrderPreservingUnaryExecNode | ||||||||||||||||
extends UnaryExecNode with AliasAwareQueryOutputOrdering[SparkPlan] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we limit the size of this map as well? Now it only limits the size of each map value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should as it might happen that we reach the map size limit but the added aliases don't project the child's partitioning / ordering. E.g. if child partitioning is
HashPartitioning(c)
and the projection isc1 as c1a, ..., c100 as c100a, c as ca
thenc as ca
is not added to the map so we would end up withUnknownPartitioning
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current config acutally limit the size of final preserved exprs. I think we should add one more config to limit the candidates size in
aliasMap
and it can be bigger by default. ThisaliasMap
may harm driver memory for wide tables.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok to add a new config but
aliasMap
will never be bigger than the projection (outputExpressions
) so is this a real concern?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave it for now. Having less
Attribute
in map values may generate fewer alternatives, but having less map entries may stop the entire projection.