From 1f5341ebccfc395dc7d392783b695b939a40d0f7 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Sat, 14 Jan 2023 12:27:17 +0800 Subject: [PATCH 01/12] Improve AliasAwareOutputExpression --- .../expressions/stringExpressions.scala | 24 +++ .../plans/AliasAwareOutputExpression.scala | 160 ++++++++++++++++++ .../spark/sql/catalyst/plans/QueryPlan.scala | 5 + .../catalyst/plans/logical/LogicalPlan.scala | 13 +- .../plans/logical/basicLogicalOperators.scala | 1 + .../apache/spark/sql/internal/SQLConf.scala | 9 + .../AliasAwareOutputExpression.scala | 63 +++---- .../spark/sql/execution/SparkPlan.scala | 3 - .../datasources/FileFormatWriter.scala | 2 +- .../sql/execution/datasources/V1Writes.scala | 30 +--- .../spark/sql/execution/PlannerSuite.scala | 76 ++++++++- .../datasources/V1WriteCommandSuite.scala | 27 ++- 12 files changed, 319 insertions(+), 94 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index a3f2ff9e7e84d..b589a474c033f 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -3039,3 +3039,27 @@ case class SplitPart ( partNum = newChildren.apply(2)) } } + +/** + * A internal function that converts the empty string to null for partition values. + * This function should be only used in V1Writes. + */ +case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { + override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v + + override def nullable: Boolean = true + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + nullSafeCodeGen(ctx, ev, c => { + s"""if ($c.numBytes() == 0) { + | ${ev.isNull} = true; + | ${ev.value} = null; + |} else { + | ${ev.value} = $c; + |}""".stripMargin + }) + } + + override protected def withNewChildInternal(newChild: Expression): Empty2Null = + copy(child = newChild) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala new file mode 100644 index 0000000000000..af251388b253b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.internal.SQLConf + +/** + * A trait that provides functionality to handle aliases in the `outputExpressions`. + */ +trait AliasAwareOutputExpression extends SQLConfHelper { + private val aliasCandidateLimit = conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT) + protected def outputExpressions: Seq[NamedExpression] + /** + * This method can be used to strip expression which does not affect the result, for example: + * strip the expression which is ordering agnostic for output ordering. + */ + protected def strip(expr: Expression): Expression = expr + + private lazy val aliasMap: Map[Expression, ArrayBuffer[Attribute]] = { + if (aliasCandidateLimit < 1) { + Map.empty + } else { + val exprWithAliasMap = new mutable.HashMap[Expression, ArrayBuffer[Attribute]]() + + def updateAttrWithAliasMap(key: Expression, target: Attribute): Unit = { + val aliasArray = exprWithAliasMap.getOrElseUpdate( + strip(key).canonicalized, new ArrayBuffer[Attribute]()) + // pre-filter if the number of alias exceed candidate limit + if (aliasArray.size < aliasCandidateLimit) { + aliasArray.append(target) + } + } + + outputExpressions.foreach { + case a @ Alias(child, _) => + updateAttrWithAliasMap(child, a.toAttribute) + case _ => + } + exprWithAliasMap.toMap + } + } + + protected def hasAlias: Boolean = aliasMap.nonEmpty + + /** + * Return a set of Expression which normalize the original expression to the aliased. + * @param pruneFunc used to prune the alias-replaced expression whose references are not the + * subset of output + */ + protected def normalizeExpression( + expr: Expression, + pruneFunc: (Expression, AttributeSet) => Option[Expression]): Seq[Expression] = { + val normalizedCandidates = new mutable.HashSet[Expression]() + normalizedCandidates.add(expr) + val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) + + def pruneCandidate(candidate: Expression): Option[Expression] = { + if (candidate.references.subsetOf(outputSet)) { + Some(candidate) + } else { + pruneFunc(candidate, outputSet) + } + } + + // Stop loop if the size of candidates exceed limit + for ((origin, aliases) <- aliasMap if normalizedCandidates.size <= aliasCandidateLimit) { + for (alias <- aliases if normalizedCandidates.size <= aliasCandidateLimit) { + val localCandidates = normalizedCandidates.toArray + for (candidate <- localCandidates if normalizedCandidates.size <= aliasCandidateLimit) { + var hasOtherAlias = false + val newCandidate = candidate.transformDown { + case e: Expression if e.canonicalized == origin => alias + case e if aliasMap.contains(e.canonicalized) => + hasOtherAlias = true + e + } + + if (!candidate.fastEquals(newCandidate)) { + if (!hasOtherAlias) { + // If there is no other alias, we can do eagerly pruning to speed up + pruneCandidate(newCandidate).foreach(e => normalizedCandidates.add(e)) + } else { + // We can not do pruning at this branch because we may miss replace alias with + // other sub-expression. We can only prune after all alias replaced. + normalizedCandidates.add(newCandidate) + } + } + } + } + } + + val pruned = normalizedCandidates.flatMap { candidate => + pruneCandidate(candidate) + } + if (pruned.isEmpty) { + expr :: Nil + } else { + pruned.toSeq + } + } +} + +/** + * A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that + * satisfies ordering requirements. + */ +trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]] + extends AliasAwareOutputExpression { self: QueryPlan[T] => + protected def orderingExpressions: Seq[SortOrder] + + override protected def strip(expr: Expression): Expression = expr match { + case e: Empty2Null => strip(e.child) + case _ => expr + } + + override final def outputOrdering: Seq[SortOrder] = { + if (hasAlias) { + orderingExpressions.map { sortOrder => + val normalized = normalizeExpression(sortOrder, (replacedExpr, outputExpressionSet) => { + assert(replacedExpr.isInstanceOf[SortOrder]) + val sortOrder = replacedExpr.asInstanceOf[SortOrder] + val pruned = sortOrder.children.filter { child => + child.references.subsetOf(outputExpressionSet) + } + if (pruned.isEmpty) { + None + } else { + // All expressions after pruned are semantics equality, so just use head to build a new + // SortOrder and use tail as the sameOrderExpressions. + Some(SortOrder(pruned.head, sortOrder.direction, sortOrder.nullOrdering, pruned.tail)) + } + }) + sortOrder.copy(sameOrderExpressions = + normalized.flatMap(_.asInstanceOf[SortOrder].children)) + } + } else { + orderingExpressions + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 0942919b17677..1d84139e11f15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -53,6 +53,11 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] @transient lazy val outputSet: AttributeSet = AttributeSet(output) + /** + * Returns the output ordering that this plan generates. + */ + def outputOrdering: Seq[SortOrder] = Nil + // Override `treePatternBits` to propagate bits for its expressions. override lazy val treePatternBits: BitSet = { val bits: BitSet = getDefaultTreePatternBits diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index d3df6f0dd989b..fa2b0e540c536 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.{AliasAwareQueryOutputOrdering, QueryPlan} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -141,11 +141,6 @@ abstract class LogicalPlan */ def refresh(): Unit = children.foreach(_.refresh()) - /** - * Returns the output ordering that this plan generates. - */ - def outputOrdering: Seq[SortOrder] = Nil - /** * Returns true iff `other`'s output is semantically the same, i.e.: * - it contains the same number of `Attribute`s; @@ -205,8 +200,10 @@ trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] { */ trait BinaryNode extends LogicalPlan with BinaryLike[LogicalPlan] -abstract class OrderPreservingUnaryNode extends UnaryNode { - override final def outputOrdering: Seq[SortOrder] = child.outputOrdering +trait OrderPreservingUnaryNode extends UnaryNode + with AliasAwareQueryOutputOrdering[LogicalPlan] { + override protected def outputExpressions: Seq[NamedExpression] = child.output + override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering } object LogicalPlanIntegrity { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 343fa3517c650..a8dfb8fbd8474 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -69,6 +69,7 @@ object Subquery { case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) + override protected def outputExpressions: Seq[NamedExpression] = projectList override def maxRows: Option[Long] = child.maxRows override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d949ec56632cd..eff72933249a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -435,6 +435,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val EXPRESSION_PROJECTION_CANDIDATE_LIMIT = + buildConf("spark.sql.optimizer.expressionProjectionCandidateLimit") + .doc("The maximum number of the candidate of out put expressions whose alias are replaced." + + " It can preserve the output partitioning and ordering." + + " Negative value means disable this optimization.") + .version("3.4.0") + .intConf + .createWithDefault(100) + val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed") .doc("When set to true Spark SQL will automatically select a compression codec for each " + "column based on statistics of the data.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index 92e86637eeccf..c142e11501026 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -16,38 +16,35 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} - -/** - * A trait that provides functionality to handle aliases in the `outputExpressions`. - */ -trait AliasAwareOutputExpression extends UnaryExecNode { - protected def outputExpressions: Seq[NamedExpression] - - private lazy val aliasMap = outputExpressions.collect { - case a @ Alias(child, _) => child.canonicalized -> a.toAttribute - }.toMap - - protected def hasAlias: Boolean = aliasMap.nonEmpty - - protected def normalizeExpression(exp: Expression): Expression = { - exp.transformDown { - case e: Expression => aliasMap.getOrElse(e.canonicalized, e) - } - } -} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning} /** * A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that * satisfies distribution requirements. */ -trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression { +trait AliasAwareOutputPartitioning extends UnaryExecNode + with AliasAwareOutputExpression { final override def outputPartitioning: Partitioning = { val normalizedOutputPartitioning = if (hasAlias) { child.outputPartitioning match { case e: Expression => - normalizeExpression(e).asInstanceOf[Partitioning] + val normalized = normalizeExpression(e, (replacedExpr, outputExpressionSet) => { + assert(replacedExpr.isInstanceOf[Partitioning]) + // It's hard to deduplicate partitioning inside `PartitioningCollection` at + // `AliasAwareOutputExpression`, so here we should do distinct. + val pruned = flattenPartitioning(replacedExpr.asInstanceOf[Partitioning]).filter { + case e: Expression => e.references.subsetOf(outputExpressionSet) + case _ => true + }.distinct + if (pruned.isEmpty) { + None + } else { + Some(PartitioningCollection(pruned)) + } + }) + PartitioningCollection(normalized.asInstanceOf[Seq[Partitioning]]) case other => other } } else { @@ -55,9 +52,9 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression { } flattenPartitioning(normalizedOutputPartitioning).filter { - case hashPartitioning: HashPartitioning => hashPartitioning.references.subsetOf(outputSet) + case e: Expression => e.references.subsetOf(outputSet) case _ => true - } match { + }.distinct match { case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) case Seq(singlePartitioning) => singlePartitioning case seqWithMultiplePartitionings => PartitioningCollection(seqWithMultiplePartitionings) @@ -74,18 +71,4 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression { } } -/** - * A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that - * satisfies ordering requirements. - */ -trait AliasAwareOutputOrdering extends AliasAwareOutputExpression { - protected def orderingExpressions: Seq[SortOrder] - - final override def outputOrdering: Seq[SortOrder] = { - if (hasAlias) { - orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder]) - } else { - orderingExpressions - } - } -} +trait AliasAwareOutputOrdering extends UnaryExecNode with AliasAwareQueryOutputOrdering[SparkPlan] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 5ca36a8a216af..bbd74a1fe7407 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -179,9 +179,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ def requiredChildDistribution: Seq[Distribution] = Seq.fill(children.size)(UnspecifiedDistribution) - /** Specifies how data is ordered in each partition. */ - def outputOrdering: Seq[SortOrder] = Nil - /** Specifies sort order for each partition requirements on the input data for this operator. */ def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 5c4d662c14591..8efc52edbfd48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -158,7 +158,7 @@ object FileFormatWriter extends Logging { // Use the output ordering from the original plan before adding the empty2null projection. val actualOrdering = writeFilesOpt.map(_.child) .getOrElse(materializeAdaptiveSparkPlan(plan)) - .outputOrdering.map(_.child) + .outputOrdering val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) SQLExecution.checkSQLExecutionId(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index d52af64521855..76167b6004562 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression} -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule @@ -29,7 +28,6 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType -import org.apache.spark.unsafe.types.UTF8String trait V1WriteCommand extends DataWritingCommand { /** @@ -121,26 +119,6 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { } object V1WritesUtils { - - /** A function that converts the empty string to null for partition values. */ - case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { - override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v - override def nullable: Boolean = true - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - nullSafeCodeGen(ctx, ev, c => { - s"""if ($c.numBytes() == 0) { - | ${ev.isNull} = true; - | ${ev.value} = null; - |} else { - | ${ev.value} = $c; - |}""".stripMargin - }) - } - - override protected def withNewChildInternal(newChild: Expression): Empty2Null = - copy(child = newChild) - } - def getWriterBucketSpec( bucketSpec: Option[BucketSpec], dataColumns: Seq[Attribute], @@ -230,12 +208,14 @@ object V1WritesUtils { def isOrderingMatched( requiredOrdering: Seq[Expression], - outputOrdering: Seq[Expression]): Boolean = { + outputOrdering: Seq[SortOrder]): Boolean = { if (requiredOrdering.length > outputOrdering.length) { false } else { requiredOrdering.zip(outputOrdering).forall { - case (requiredOrder, outputOrder) => requiredOrder.semanticEquals(outputOrder) + case (requiredOrder, outputOrder) => + // Follow `SortOrder.satisfies` that respects `SortOrder.sameOrderExpressions` + outputOrder.children.exists(_.semanticEquals(requiredOrder)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 248c68abd1e0e..15222e57fb829 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -1072,7 +1072,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(projects.exists(_.outputPartitioning match { case PartitioningCollection(Seq(HashPartitioning(Seq(k1: AttributeReference), _), HashPartitioning(Seq(k2: AttributeReference), _))) => - k1.name == "t1id" && k2.name == "t2id" + Set(k1.name, k2.name) == Set("t1id", "t2id") case _ => false })) } @@ -1249,7 +1249,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(planned.outputPartitioning match { case PartitioningCollection(Seq(HashPartitioning(Seq(k1: AttributeReference), _), HashPartitioning(Seq(k2: AttributeReference), _))) => - k1.name == "t1id" && k2.name == "t2id" + Set(k1.name, k2.name) == Set("t1id", "t2id") }) val planned2 = sql( @@ -1314,6 +1314,78 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(topKs.size == 1) assert(sorts.isEmpty) } + + test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - multi-alias") { + Seq(0, 1, 5).foreach { limit => + withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { + val df = spark.range(2).orderBy($"id").selectExpr("id as x", "id as y", "id as z") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering.size == 1) + limit match { + case 5 => + assert(outputOrdering.head.sameOrderExpressions.size == 3) + assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name) + .toSet == Set("x", "y", "z")) + case 1 => + assert(outputOrdering.head.sameOrderExpressions.size == 1) + assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name) + .toSet.subsetOf(Set("x", "y", "z"))) + case 0 => + assert(outputOrdering.head.sameOrderExpressions.isEmpty) + } + } + } + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - multi-alias") { + Seq(0, 1, 5).foreach { limit => + withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { + val df = spark.range(2).repartition($"id").selectExpr("id as x", "id as y", "id as z") + val outputPartitioning = stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning + limit match { + case 5 => + val p = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + assert(p.size == 3) + assert(p.flatMap(_.asInstanceOf[HashPartitioning].expressions + .map(_.asInstanceOf[Attribute].name)).toSet == Set("x", "y", "z")) + case 1 => + val p = outputPartitioning.asInstanceOf[HashPartitioning] + assert(p.expressions.size == 1) + assert(p.expressions.map(_.asInstanceOf[Attribute].name) + .toSet.subsetOf(Set("x", "y", "z"))) + case 0 => + // the references of child output partitioning is not the subset of output, + // so it has been pruned + assert(outputPartitioning.isInstanceOf[UnknownPartitioning]) + } + } + } + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - multi-references") { + val df = spark.range(2).selectExpr("id as a", "id as b") + .orderBy($"a" + $"b").selectExpr("a as x", "b as y") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sameOrderExpressions.size == 1) + // (a + b), (a + y), (x + b) are pruned since their references are not the subset of output + outputOrdering.head.sameOrderExpressions.head match { + case Add(l: Attribute, r: Attribute, _) => assert(l.name == "x" && r.name == "y") + case _ => fail(s"Unexpected ${outputOrdering.head.sameOrderExpressions.head}") + } + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - multi-references") { + val df = spark.range(2).selectExpr("id as a", "id as b") + .repartition($"a" + $"b").selectExpr("a as x", "b as y") + val outputPartitioning = stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning + // (a + b), (a + y), (x + b) are pruned since their references are not the subset of output + outputPartitioning match { + case HashPartitioning(Seq(Add(l: Attribute, r: Attribute, _)), _) => + assert(l.name == "x" && r.name == "y") + case _ => fail(s"Unexpected $outputPartitioning") + } + } } // Used for unit-testing EnsureRequirements diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index 40574a8e73aa2..20a90cd94b65b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -90,9 +90,11 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils { sparkContext.listenerBus.waitUntilEmpty() assert(optimizedPlan != null) - // Check whether a logical sort node is at the top of the logical plan of the write query. - assert(optimizedPlan.isInstanceOf[Sort] == hasLogicalSort, - s"Expect hasLogicalSort: $hasLogicalSort, Actual: ${optimizedPlan.isInstanceOf[Sort]}") + // Check whether exists a logical sort node of the write query. + // If user specified sort matches required ordering, the sort node may not at the top of query. + assert(optimizedPlan.exists(_.isInstanceOf[Sort]) == hasLogicalSort, + s"Expect hasLogicalSort: $hasLogicalSort," + + s"Actual: ${optimizedPlan.exists(_.isInstanceOf[Sort])}") // Check empty2null conversion. val empty2nullExpr = optimizedPlan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions)) @@ -223,8 +225,8 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write case s: SortExec => s }.exists { case SortExec(Seq( - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) ), false, _, _) => true case _ => false }, plan) @@ -268,16 +270,11 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write // assert the outer most sort in the executed plan assert(plan.collectFirst { case s: SortExec => s - }.map(s => (enabled, s)).exists { - case (false, SortExec(Seq( - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) - ), false, _, _)) => true - - // SPARK-40885: this bug removes the in-partition sort, which manifests here - case (true, SortExec(Seq( - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) - ), false, _, _)) => true + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) + ), false, _, _) => true case _ => false }, plan) } From 5fe4fa622e7a871fba4015797e0097041b3c9648 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Sun, 15 Jan 2023 10:00:03 +0100 Subject: [PATCH 02/12] use multiTransform in AliasAwareOutputPartitioning, add fixes and UTs from SPARK-4086 PR --- .../plans/AliasAwareOutputExpression.scala | 85 +++++-------------- .../AliasAwareOutputExpression.scala | 22 ++++- .../org/apache/spark/sql/ExplainSuite.scala | 2 +- .../CoalesceShufflePartitionsSuite.scala | 8 +- .../spark/sql/execution/PlannerSuite.scala | 58 +++++++++++++ 5 files changed, 104 insertions(+), 71 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala index af251388b253b..e9cd582f000d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.plans import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder} @@ -36,28 +35,25 @@ trait AliasAwareOutputExpression extends SQLConfHelper { */ protected def strip(expr: Expression): Expression = expr - private lazy val aliasMap: Map[Expression, ArrayBuffer[Attribute]] = { - if (aliasCandidateLimit < 1) { - Map.empty - } else { - val exprWithAliasMap = new mutable.HashMap[Expression, ArrayBuffer[Attribute]]() - - def updateAttrWithAliasMap(key: Expression, target: Attribute): Unit = { - val aliasArray = exprWithAliasMap.getOrElseUpdate( - strip(key).canonicalized, new ArrayBuffer[Attribute]()) - // pre-filter if the number of alias exceed candidate limit - if (aliasArray.size < aliasCandidateLimit) { - aliasArray.append(target) - } - } - - outputExpressions.foreach { - case a @ Alias(child, _) => - updateAttrWithAliasMap(child, a.toAttribute) - case _ => - } - exprWithAliasMap.toMap + private lazy val aliasMap = { + val aliases = mutable.Map[Expression, mutable.ListBuffer[Attribute]]() + // Add aliases to the map. If multiple alias is defined for a source attribute then add all. + outputExpressions.foreach { + case a @ Alias(child, _) => + // This prepend is needed to make the first element of the `ListBuffer` point to the last + // occurrence of an aliased child. This is to keep the previous behavior and give precedence + // the last Alias during `normalizeExpression()` to avoid any kinds of regression. + a.toAttribute +=: + aliases.getOrElseUpdate(strip(child.canonicalized), mutable.ListBuffer.empty) + case _ => } + // Append identity mapping of an attribute to the map if both the attribute and its aliased + // version can be found in `outputExpressions`. + outputExpressions.foreach { + case a: Attribute if aliases.contains(a.canonicalized) => aliases(a.canonicalized) += a + case _ => + } + aliases } protected def hasAlias: Boolean = aliasMap.nonEmpty @@ -70,53 +66,16 @@ trait AliasAwareOutputExpression extends SQLConfHelper { protected def normalizeExpression( expr: Expression, pruneFunc: (Expression, AttributeSet) => Option[Expression]): Seq[Expression] = { - val normalizedCandidates = new mutable.HashSet[Expression]() - normalizedCandidates.add(expr) val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) - - def pruneCandidate(candidate: Expression): Option[Expression] = { + expr.multiTransform { + case e: Expression if aliasMap.contains(e.canonicalized) => aliasMap(e.canonicalized).toSeq + }.flatMap { candidate => if (candidate.references.subsetOf(outputSet)) { Some(candidate) } else { pruneFunc(candidate, outputSet) } - } - - // Stop loop if the size of candidates exceed limit - for ((origin, aliases) <- aliasMap if normalizedCandidates.size <= aliasCandidateLimit) { - for (alias <- aliases if normalizedCandidates.size <= aliasCandidateLimit) { - val localCandidates = normalizedCandidates.toArray - for (candidate <- localCandidates if normalizedCandidates.size <= aliasCandidateLimit) { - var hasOtherAlias = false - val newCandidate = candidate.transformDown { - case e: Expression if e.canonicalized == origin => alias - case e if aliasMap.contains(e.canonicalized) => - hasOtherAlias = true - e - } - - if (!candidate.fastEquals(newCandidate)) { - if (!hasOtherAlias) { - // If there is no other alias, we can do eagerly pruning to speed up - pruneCandidate(newCandidate).foreach(e => normalizedCandidates.add(e)) - } else { - // We can not do pruning at this branch because we may miss replace alias with - // other sub-expression. We can only prune after all alias replaced. - normalizedCandidates.add(newCandidate) - } - } - } - } - } - - val pruned = normalizedCandidates.flatMap { candidate => - pruneCandidate(candidate) - } - if (pruned.isEmpty) { - expr :: Nil - } else { - pruned.toSeq - } + }.take(aliasCandidateLimit) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index c142e11501026..dbd7b5c34adb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution +import scala.collection.mutable + import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering} import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning} @@ -44,17 +46,31 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode Some(PartitioningCollection(pruned)) } }) - PartitioningCollection(normalized.asInstanceOf[Seq[Partitioning]]) + normalized.asInstanceOf[Seq[Partitioning]] match { + case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) + case Seq(p) => p + case ps => PartitioningCollection(ps) + } case other => other } } else { child.outputPartitioning } + // We need unique `Partitioning`s but `normalizedOutputPartitioning` might not contain unique + // elements. + // E.g. if the input partitioning is `HashPartitioning(Seq(id + id))` and we have `id -> a` and + // `id -> b` as alias mappings in a projection node. After the mapping + // `normalizedOutputPartitioning` contains 4 elements: + // `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`, + // `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but + // `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`. + val expressionPartitionings = mutable.Set.empty[Expression] flattenPartitioning(normalizedOutputPartitioning).filter { - case e: Expression => e.references.subsetOf(outputSet) + case e: Expression => + e.references.subsetOf(outputSet) && expressionPartitionings.add(e.canonicalized) case _ => true - }.distinct match { + } match { case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) case Seq(singlePartitioning) => singlePartitioning case seqWithMultiplePartitionings => PartitioningCollection(seqWithMultiplePartitionings) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 9a75cc5ff8f71..a6b295578d664 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -677,7 +677,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit df.createTempView("df") val sqlText = "EXPLAIN CODEGEN SELECT key, MAX(value) FROM df GROUP BY key" - val expectedCodegenText = "Found 2 WholeStageCodegen subtrees." + val expectedCodegenText = "Found 1 WholeStageCodegen subtrees." val expectedNoCodegenText = "Found 0 WholeStageCodegen subtrees." withNormalizedExplain(sqlText) { normalizedOutput => assert(normalizedOutput.contains(expectedNoCodegenText)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index 81777f67f3701..24a98dd83f33a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -339,12 +339,12 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite { // ShuffleQueryStage 0 // ShuffleQueryStage 2 // ReusedQueryStage 0 - val grouped = df.groupBy("key").agg(max("value").as("value")) + val grouped = df.groupBy((col("key") + 1).as("key")).agg(max("value").as("value")) val resultDf2 = grouped.groupBy(col("key") + 1).max("value") .union(grouped.groupBy(col("key") + 2).max("value")) - QueryTest.checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Row(2, 1) :: Row(3, 1) :: - Row(3, 2) :: Row(4, 2) :: Row(4, 3) :: Row(5, 3) :: Row(5, 4) :: Row(6, 4) :: Row(6, 5) :: - Row(7, 5) :: Nil) + QueryTest.checkAnswer(resultDf2, Row(2, 0) :: Row(3, 0) :: Row(3, 1) :: Row(4, 1) :: + Row(4, 2) :: Row(5, 2) :: Row(5, 3) :: Row(6, 3) :: Row(6, 4) :: Row(7, 4) :: Row(7, 5) :: + Row(8, 5) :: Nil) val finalPlan2 = resultDf2.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 15222e57fb829..ac83e00cd4907 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -1315,6 +1315,64 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(sorts.isEmpty) } + test("SPARK-40086: an attribute and its aliased version in aggregate expressions should not " + + "introduce extra shuffle") { + withTempView("df") { + spark.range(5).select(col("id"), col("id").as("value")).createTempView("df") + val df = sql("SELECT id, max(value) FROM df GROUP BY id") + + val planned = df.queryExecution.executedPlan + + assert(collect(planned) { case h: HashAggregateExec => h }.nonEmpty) + + val exchanges = collect(planned) { case s: ShuffleExchangeExec => s } + assert(exchanges.size == 0) + } + } + + test("SPARK-40086: multiple aliases to the same attribute in aggregate expressions should not " + + "introduce extra shuffle") { + withTempView("df") { + spark.range(5).select(col("id").as("key"), col("id").as("value")).createTempView("df") + val df = sql("SELECT key, max(value) FROM df GROUP BY key") + + val planned = df.queryExecution.executedPlan + + assert(collect(planned) { case h: HashAggregateExec => h }.nonEmpty) + + val exchanges = collect(planned) { case s: ShuffleExchangeExec => s } + assert(exchanges.size == 0) + } + } + + test("SPARK-40086: multiple aliases to the same attribute with complex required distribution " + + "should not introduce extra shuffle") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.range(5) + val df1 = df.repartition($"id" + $"id") + .select($"id".as("key1"), $"id".as("value1"), ($"id" + $"id").as("idPlusId1")) + val df2 = df.repartition($"id" + $"id") + .select($"id".as("key2"), $"id".as("value2"), ($"id" + $"id").as("idPlusId2")) + val df3 = df1.join(df2, $"key1" + $"value1" === $"idPlusId2") + + val planned = df3.queryExecution.executedPlan + + val numShuffles = collect(planned) { + case e: ShuffleExchangeExec => e + } + // before: numShuffles is 4 + assert(numShuffles.size == 2) + val numOutputPartitioning = collectFirst(planned) { + case e: SortMergeJoinExec => e.outputPartitioning match { + case PartitioningCollection(Seq(PartitioningCollection(l), PartitioningCollection(r))) => + l ++ r + case _ => Seq.empty + } + }.get + assert(numOutputPartitioning.size == 8) + } + } + test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - multi-alias") { Seq(0, 1, 5).foreach { limit => withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { From b693b3120488f55e761d7d95e542724e0f1fb37b Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 16 Jan 2023 14:54:12 +0100 Subject: [PATCH 03/12] split the aliasMap --- .../plans/AliasAwareOutputExpression.scala | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala index e9cd582f000d7..7a63198844108 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -35,14 +35,18 @@ trait AliasAwareOutputExpression extends SQLConfHelper { */ protected def strip(expr: Expression): Expression = expr - private lazy val aliasMap = { + // Split the alias map into 2 maps, the first contains `Expression` -> `Attribute` mappings where + // any children of the `Expression` contains any other mapping. This because during + // `normalizeExpression()` we will need to handle those maps separately and don't stop generating + // alternatives at the `Expression` but we also need to traverse down to its children. + private lazy val (exprAliasMap, attrAliasMap) = { val aliases = mutable.Map[Expression, mutable.ListBuffer[Attribute]]() // Add aliases to the map. If multiple alias is defined for a source attribute then add all. outputExpressions.foreach { case a @ Alias(child, _) => // This prepend is needed to make the first element of the `ListBuffer` point to the last // occurrence of an aliased child. This is to keep the previous behavior and give precedence - // the last Alias during `normalizeExpression()` to avoid any kinds of regression. + // the last Alias during `normalizeExpression()` to avoid any kind of regression. a.toAttribute +=: aliases.getOrElseUpdate(strip(child.canonicalized), mutable.ListBuffer.empty) case _ => @@ -53,10 +57,11 @@ trait AliasAwareOutputExpression extends SQLConfHelper { case a: Attribute if aliases.contains(a.canonicalized) => aliases(a.canonicalized) += a case _ => } - aliases + + aliases.partition { case (expr, _) => expr.children.exists(_.exists(aliases.contains)) } } - protected def hasAlias: Boolean = aliasMap.nonEmpty + protected def hasAlias: Boolean = attrAliasMap.nonEmpty /** * Return a set of Expression which normalize the original expression to the aliased. @@ -67,8 +72,11 @@ trait AliasAwareOutputExpression extends SQLConfHelper { expr: Expression, pruneFunc: (Expression, AttributeSet) => Option[Expression]): Seq[Expression] = { val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) - expr.multiTransform { - case e: Expression if aliasMap.contains(e.canonicalized) => aliasMap(e.canonicalized).toSeq + expr.multiTransformDown { + case e: Expression if exprAliasMap.contains(e.canonicalized) => + (exprAliasMap(e.canonicalized) :+ e).toStream + case e: Expression if attrAliasMap.contains(e.canonicalized) => + attrAliasMap(e.canonicalized).toStream }.flatMap { candidate => if (candidate.references.subsetOf(outputSet)) { Some(candidate) From 5e8b08409b37ba70ebe9fd6d9ee19387f7463a4c Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 16 Jan 2023 18:17:23 +0100 Subject: [PATCH 04/12] make `generateChildrenSeq()` public, add early pruning using `multiTransform()` and `generateChildrenSeq()` --- .../plans/AliasAwareOutputExpression.scala | 77 +++++++++++-------- .../AliasAwareOutputExpression.scala | 18 +---- 2 files changed, 48 insertions(+), 47 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala index 7a63198844108..a825a673c8b94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -21,12 +21,14 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} +import org.apache.spark.sql.catalyst.trees.MultiTransformHelper import org.apache.spark.sql.internal.SQLConf /** * A trait that provides functionality to handle aliases in the `outputExpressions`. */ -trait AliasAwareOutputExpression extends SQLConfHelper { +trait AliasAwareOutputExpression extends SQLConfHelper with MultiTransformHelper { private val aliasCandidateLimit = conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT) protected def outputExpressions: Seq[NamedExpression] /** @@ -65,25 +67,46 @@ trait AliasAwareOutputExpression extends SQLConfHelper { /** * Return a set of Expression which normalize the original expression to the aliased. - * @param pruneFunc used to prune the alias-replaced expression whose references are not the - * subset of output */ - protected def normalizeExpression( - expr: Expression, - pruneFunc: (Expression, AttributeSet) => Option[Expression]): Seq[Expression] = { + protected def normalizeExpression(expr: Expression): Seq[Expression] = { val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) - expr.multiTransformDown { + + def f: PartialFunction[Expression, Stream[Expression]] = { + // Mapping with aliases case e: Expression if exprAliasMap.contains(e.canonicalized) => (exprAliasMap(e.canonicalized) :+ e).toStream case e: Expression if attrAliasMap.contains(e.canonicalized) => attrAliasMap(e.canonicalized).toStream - }.flatMap { candidate => - if (candidate.references.subsetOf(outputSet)) { - Some(candidate) - } else { - pruneFunc(candidate, outputSet) - } - }.take(aliasCandidateLimit) + + // Prune if we encounter an attribute that we can't map and it is not in output set. + // This prune will go up to the closest `multiTransformDown()` call and returns `Stream.empty` + // there. + case a: Attribute if !outputSet.contains(a) => Stream.empty + + // Filter PartitioningCollection children + case p: PartitioningCollection => + val childrenStreams = p.partitionings.map { + case e: Expression => e.multiTransformDown(f).asInstanceOf[Stream[Partitioning]] + case o => Stream(o) + }.filter(_.nonEmpty) + generateChildrenSeq(childrenStreams).flatMap { + case Nil => None + // We might have an expression type partitioning that doesn't need + // `PartitioningCollection` + case (p: Expression) :: Nil => Some(p) + case p :: Nil => Some(PartitioningCollection(Seq(p))) + case ps => Some(PartitioningCollection(ps)) + } + + // Filter SortOrder children + case s: SortOrder => + val childrenStreams = s.children.map(_.multiTransformDown(f)).filter(_.nonEmpty) + generateChildrenSeq(childrenStreams) + .filter(_.nonEmpty) + .map(es => s.copy(child = es.head, sameOrderExpressions = es.tail)) + } + + expr.multiTransformDown(f).take(aliasCandidateLimit) } } @@ -102,23 +125,15 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]] override final def outputOrdering: Seq[SortOrder] = { if (hasAlias) { - orderingExpressions.map { sortOrder => - val normalized = normalizeExpression(sortOrder, (replacedExpr, outputExpressionSet) => { - assert(replacedExpr.isInstanceOf[SortOrder]) - val sortOrder = replacedExpr.asInstanceOf[SortOrder] - val pruned = sortOrder.children.filter { child => - child.references.subsetOf(outputExpressionSet) - } - if (pruned.isEmpty) { - None - } else { - // All expressions after pruned are semantics equality, so just use head to build a new - // SortOrder and use tail as the sameOrderExpressions. - Some(SortOrder(pruned.head, sortOrder.direction, sortOrder.nullOrdering, pruned.tail)) - } - }) - sortOrder.copy(sameOrderExpressions = - normalized.flatMap(_.asInstanceOf[SortOrder].children)) + orderingExpressions.flatMap { sortOrder => + val normalized = normalizeExpression(sortOrder) + val allOrderingExpressions = normalized.flatMap(_.asInstanceOf[SortOrder].children) + if (allOrderingExpressions.isEmpty) { + None + } else { + Some(sortOrder.copy(child = allOrderingExpressions.head, + sameOrderExpressions = allOrderingExpressions.tail)) + } } } else { orderingExpressions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index dbd7b5c34adb3..46d787c56b4fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -32,20 +32,7 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode val normalizedOutputPartitioning = if (hasAlias) { child.outputPartitioning match { case e: Expression => - val normalized = normalizeExpression(e, (replacedExpr, outputExpressionSet) => { - assert(replacedExpr.isInstanceOf[Partitioning]) - // It's hard to deduplicate partitioning inside `PartitioningCollection` at - // `AliasAwareOutputExpression`, so here we should do distinct. - val pruned = flattenPartitioning(replacedExpr.asInstanceOf[Partitioning]).filter { - case e: Expression => e.references.subsetOf(outputExpressionSet) - case _ => true - }.distinct - if (pruned.isEmpty) { - None - } else { - Some(PartitioningCollection(pruned)) - } - }) + val normalized = normalizeExpression(e) normalized.asInstanceOf[Seq[Partitioning]] match { case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) case Seq(p) => p @@ -67,8 +54,7 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode // `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`. val expressionPartitionings = mutable.Set.empty[Expression] flattenPartitioning(normalizedOutputPartitioning).filter { - case e: Expression => - e.references.subsetOf(outputSet) && expressionPartitionings.add(e.canonicalized) + case e: Expression => expressionPartitionings.add(e.canonicalized) case _ => true } match { case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) From 7a57a29fdf829ef0680f70825d41c2362c9a5b46 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 17 Jan 2023 14:13:33 +0100 Subject: [PATCH 05/12] fix tests --- .../spark/sql/execution/PlannerSuite.scala | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index ac83e00cd4907..855741d37a9a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -1101,9 +1101,8 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { val projects = collect(planned) { case p: ProjectExec => p } assert(projects.exists(_.outputOrdering match { - case Seq(SortOrder(_, Ascending, NullsFirst, sameOrderExprs)) => - sameOrderExprs.size == 1 && sameOrderExprs.head.isInstanceOf[AttributeReference] && - sameOrderExprs.head.asInstanceOf[AttributeReference].name == "t2id" + case Seq(s @ SortOrder(_, Ascending, NullsFirst, _)) => + s.children.map(_.asInstanceOf[AttributeReference].name).toSet == Set("t2id", "t3id") case _ => false })) } @@ -1374,22 +1373,26 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - multi-alias") { - Seq(0, 1, 5).foreach { limit => + Seq(0, 1, 2, 5).foreach { limit => withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { val df = spark.range(2).orderBy($"id").selectExpr("id as x", "id as y", "id as z") val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering - assert(outputOrdering.size == 1) limit match { case 5 => - assert(outputOrdering.head.sameOrderExpressions.size == 3) + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sameOrderExpressions.size == 2) assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name) - .toSet == Set("x", "y", "z")) - case 1 => + .toSet.subsetOf(Set("x", "y", "z"))) + case 2 => + assert(outputOrdering.size == 1) assert(outputOrdering.head.sameOrderExpressions.size == 1) assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name) .toSet.subsetOf(Set("x", "y", "z"))) + case 1 => + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sameOrderExpressions.size == 0) case 0 => - assert(outputOrdering.head.sameOrderExpressions.isEmpty) + assert(outputOrdering.size == 0) } } } @@ -1425,12 +1428,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { .orderBy($"a" + $"b").selectExpr("a as x", "b as y") val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering assert(outputOrdering.size == 1) - assert(outputOrdering.head.sameOrderExpressions.size == 1) - // (a + b), (a + y), (x + b) are pruned since their references are not the subset of output - outputOrdering.head.sameOrderExpressions.head match { - case Add(l: Attribute, r: Attribute, _) => assert(l.name == "x" && r.name == "y") - case _ => fail(s"Unexpected ${outputOrdering.head.sameOrderExpressions.head}") - } + assert(outputOrdering.head.sameOrderExpressions.size == 0) } test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - multi-references") { From 0138ad49afc5921bf33aa4becd99fd50c715e3a6 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 18 Jan 2023 09:11:38 +0100 Subject: [PATCH 06/12] add more explanation to `normalizeExpression()` --- .../sql/catalyst/plans/AliasAwareOutputExpression.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala index a825a673c8b94..8828df3f8f6b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -83,7 +83,12 @@ trait AliasAwareOutputExpression extends SQLConfHelper with MultiTransformHelper // there. case a: Attribute if !outputSet.contains(a) => Stream.empty - // Filter PartitioningCollection children + // Remove `PartitioningCollection` elements that are expressions and contain an attribute that + // can't be mapped and the node's output set doesn't contain the attribute. + // To achieve this we need to "restart" `multiTransformDown()` for each expression child and + // filter out empty streams due to the above attribute pruning case. + // The child streams can be then combined using `generateChildrenSeq()` into one stream as + // `multiTransformDown()` would also do (but without filtering empty streams). case p: PartitioningCollection => val childrenStreams = p.partitionings.map { case e: Expression => e.multiTransformDown(f).asInstanceOf[Stream[Partitioning]] @@ -98,7 +103,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper with MultiTransformHelper case ps => Some(PartitioningCollection(ps)) } - // Filter SortOrder children + // Filter `SortOrder` children similarly to `PartitioningCollection` elements case s: SortOrder => val childrenStreams = s.children.map(_.multiTransformDown(f)).filter(_.nonEmpty) generateChildrenSeq(childrenStreams) From cf75540537fa668e34d5d38e833ec0e5e593922a Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 19 Jan 2023 16:31:56 +0100 Subject: [PATCH 07/12] address review comments --- .../plans/AliasAwareOutputExpression.scala | 99 ++++++----------- .../spark/sql/catalyst/plans/QueryPlan.scala | 4 +- .../AliasAwareOutputExpression.scala | 61 ++++++----- .../aggregate/SortAggregateExec.scala | 4 +- .../execution/basicPhysicalOperators.scala | 2 +- .../apache/spark/sql/execution/limit.scala | 2 +- .../spark/sql/execution/PlannerSuite.scala | 73 +------------ ...rojectedOrderingAndPartitioningSuite.scala | 100 ++++++++++++++++++ 8 files changed, 174 insertions(+), 171 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala index 8828df3f8f6b5..35e173056ddd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -21,15 +21,13 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} -import org.apache.spark.sql.catalyst.trees.MultiTransformHelper import org.apache.spark.sql.internal.SQLConf /** * A trait that provides functionality to handle aliases in the `outputExpressions`. */ -trait AliasAwareOutputExpression extends SQLConfHelper with MultiTransformHelper { - private val aliasCandidateLimit = conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT) +trait AliasAwareOutputExpression extends SQLConfHelper { + protected val aliasCandidateLimit = conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT) protected def outputExpressions: Seq[NamedExpression] /** * This method can be used to strip expression which does not affect the result, for example: @@ -37,81 +35,49 @@ trait AliasAwareOutputExpression extends SQLConfHelper with MultiTransformHelper */ protected def strip(expr: Expression): Expression = expr - // Split the alias map into 2 maps, the first contains `Expression` -> `Attribute` mappings where - // any children of the `Expression` contains any other mapping. This because during - // `normalizeExpression()` we will need to handle those maps separately and don't stop generating - // alternatives at the `Expression` but we also need to traverse down to its children. - private lazy val (exprAliasMap, attrAliasMap) = { - val aliases = mutable.Map[Expression, mutable.ListBuffer[Attribute]]() - // Add aliases to the map. If multiple alias is defined for a source attribute then add all. - outputExpressions.foreach { + // Build an `Expression` -> `Attribute` alias map. + // There can be multiple alias defined for the same expressions but it doesn't make sense to store + // more than `aliasCandidateLimit` attributes for an expression. In those cases the old logic + // handled only the last alias so we need to make sure that we give precedence to that. + // If the `outputExpressions` contain simple attributes we need to add those too to the map. + private lazy val aliasMap = { + val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]() + outputExpressions.reverse.foreach { case a @ Alias(child, _) => - // This prepend is needed to make the first element of the `ListBuffer` point to the last - // occurrence of an aliased child. This is to keep the previous behavior and give precedence - // the last Alias during `normalizeExpression()` to avoid any kind of regression. - a.toAttribute +=: - aliases.getOrElseUpdate(strip(child.canonicalized), mutable.ListBuffer.empty) + val buffer = aliases.getOrElseUpdate(strip(child.canonicalized), mutable.ArrayBuffer.empty) + if (buffer.size < aliasCandidateLimit) { + buffer += a.toAttribute + } case _ => } - // Append identity mapping of an attribute to the map if both the attribute and its aliased - // version can be found in `outputExpressions`. outputExpressions.foreach { - case a: Attribute if aliases.contains(a.canonicalized) => aliases(a.canonicalized) += a + case a: Attribute if aliases.contains(a.canonicalized) => + val buffer = aliases(a.canonicalized) + if (buffer.size < aliasCandidateLimit) { + buffer += a + } case _ => } - - aliases.partition { case (expr, _) => expr.children.exists(_.exists(aliases.contains)) } + aliases } - protected def hasAlias: Boolean = attrAliasMap.nonEmpty + protected def hasAlias: Boolean = aliasMap.nonEmpty /** * Return a set of Expression which normalize the original expression to the aliased. */ - protected def normalizeExpression(expr: Expression): Seq[Expression] = { + protected def projectExpression(expr: Expression): Stream[Expression] = { val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) - - def f: PartialFunction[Expression, Stream[Expression]] = { + expr.multiTransformDown { // Mapping with aliases - case e: Expression if exprAliasMap.contains(e.canonicalized) => - (exprAliasMap(e.canonicalized) :+ e).toStream - case e: Expression if attrAliasMap.contains(e.canonicalized) => - attrAliasMap(e.canonicalized).toStream + case e: Expression if aliasMap.contains(e.canonicalized) => + aliasMap(e.canonicalized) ++ (if (e.containsChild.nonEmpty) Seq(e) else Seq.empty) // Prune if we encounter an attribute that we can't map and it is not in output set. // This prune will go up to the closest `multiTransformDown()` call and returns `Stream.empty` // there. - case a: Attribute if !outputSet.contains(a) => Stream.empty - - // Remove `PartitioningCollection` elements that are expressions and contain an attribute that - // can't be mapped and the node's output set doesn't contain the attribute. - // To achieve this we need to "restart" `multiTransformDown()` for each expression child and - // filter out empty streams due to the above attribute pruning case. - // The child streams can be then combined using `generateChildrenSeq()` into one stream as - // `multiTransformDown()` would also do (but without filtering empty streams). - case p: PartitioningCollection => - val childrenStreams = p.partitionings.map { - case e: Expression => e.multiTransformDown(f).asInstanceOf[Stream[Partitioning]] - case o => Stream(o) - }.filter(_.nonEmpty) - generateChildrenSeq(childrenStreams).flatMap { - case Nil => None - // We might have an expression type partitioning that doesn't need - // `PartitioningCollection` - case (p: Expression) :: Nil => Some(p) - case p :: Nil => Some(PartitioningCollection(Seq(p))) - case ps => Some(PartitioningCollection(ps)) - } - - // Filter `SortOrder` children similarly to `PartitioningCollection` elements - case s: SortOrder => - val childrenStreams = s.children.map(_.multiTransformDown(f)).filter(_.nonEmpty) - generateChildrenSeq(childrenStreams) - .filter(_.nonEmpty) - .map(es => s.copy(child = es.head, sameOrderExpressions = es.tail)) + case a: Attribute if !outputSet.contains(a) => Seq.empty } - - expr.multiTransformDown(f).take(aliasCandidateLimit) } } @@ -131,13 +97,14 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]] override final def outputOrdering: Seq[SortOrder] = { if (hasAlias) { orderingExpressions.flatMap { sortOrder => - val normalized = normalizeExpression(sortOrder) - val allOrderingExpressions = normalized.flatMap(_.asInstanceOf[SortOrder].children) - if (allOrderingExpressions.isEmpty) { - None + val equalOrderings = sortOrder.children.toStream + .flatMap(projectExpression) + .take(aliasCandidateLimit) + if (equalOrderings.nonEmpty) { + Some(sortOrder.copy(child = equalOrderings.head, + sameOrderExpressions = equalOrderings.tail)) } else { - Some(sortOrder.copy(child = allOrderingExpressions.head, - sameOrderExpressions = allOrderingExpressions.tail)) + None } } } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 1d84139e11f15..90d1bd805cb5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -54,7 +54,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] lazy val outputSet: AttributeSet = AttributeSet(output) /** - * Returns the output ordering that this plan generates. + * Returns the output ordering that this plan generates, although the semantics differ in logical + * and physical plans. In the logical plan it means global ordering of the data while in physical + * it means ordering in each partition. */ def outputOrdering: Seq[SortOrder] = Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index 46d787c56b4fd..1d973a6e408f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -29,38 +29,42 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC trait AliasAwareOutputPartitioning extends UnaryExecNode with AliasAwareOutputExpression { final override def outputPartitioning: Partitioning = { - val normalizedOutputPartitioning = if (hasAlias) { - child.outputPartitioning match { - case e: Expression => - val normalized = normalizeExpression(e) - normalized.asInstanceOf[Seq[Partitioning]] match { - case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) - case Seq(p) => p - case ps => PartitioningCollection(ps) + if (hasAlias) { + flattenPartitioning(child.outputPartitioning).flatMap { + case p: PartitioningCollection => + val (exprPartitionings, nonExprPartitionings) = p.partitionings.partition { + case _: Expression => true + case _ => false } - case other => other + // We need unique partitionings but if the input partitioning is + // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after + // the projection we have 4 partitionings: + // `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`, + // `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but + // `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`. + val partitioningSet = mutable.Set.empty[Expression] + val projectedExprPartitionings = + exprPartitionings.asInstanceOf[Seq[Expression]].toStream + .flatMap(projectExpression) + .filter(e => partitioningSet.add(e.canonicalized)) + .take(aliasCandidateLimit) + .asInstanceOf[Stream[Partitioning]] + nonExprPartitionings ++ projectedExprPartitionings + case e: Expression => + val partitioningSet = mutable.Set.empty[Expression] + projectExpression(e) + .filter(e => partitioningSet.add(e.canonicalized)) + .take(aliasCandidateLimit) + .asInstanceOf[Stream[Partitioning]] + case o => Seq(o) + } match { + case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) + case Seq(p) => p + case ps => PartitioningCollection(ps) } } else { child.outputPartitioning } - - // We need unique `Partitioning`s but `normalizedOutputPartitioning` might not contain unique - // elements. - // E.g. if the input partitioning is `HashPartitioning(Seq(id + id))` and we have `id -> a` and - // `id -> b` as alias mappings in a projection node. After the mapping - // `normalizedOutputPartitioning` contains 4 elements: - // `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`, - // `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but - // `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`. - val expressionPartitionings = mutable.Set.empty[Expression] - flattenPartitioning(normalizedOutputPartitioning).filter { - case e: Expression => expressionPartitionings.add(e.canonicalized) - case _ => true - } match { - case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) - case Seq(singlePartitioning) => singlePartitioning - case seqWithMultiplePartitionings => PartitioningCollection(seqWithMultiplePartitionings) - } } private def flattenPartitioning(partitioning: Partitioning): Seq[Partitioning] = { @@ -73,4 +77,5 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode } } -trait AliasAwareOutputOrdering extends UnaryExecNode with AliasAwareQueryOutputOrdering[SparkPlan] +trait OrderPreservingUnaryExecNode + extends UnaryExecNode with AliasAwareQueryOutputOrdering[SparkPlan] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 3cf63a5318dcf..6042ff7b2caf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{AliasAwareOutputOrdering, SparkPlan} +import org.apache.spark.sql.execution.{OrderPreservingUnaryExecNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf @@ -41,7 +41,7 @@ case class SortAggregateExec( resultExpressions: Seq[NamedExpression], child: SparkPlan) extends AggregateCodegenSupport - with AliasAwareOutputOrdering { + with OrderPreservingUnaryExecNode { override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 76a8d3d942f93..ab696d69c92df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -43,7 +43,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryExecNode with CodegenSupport with AliasAwareOutputPartitioning - with AliasAwareOutputOrdering { + with OrderPreservingUnaryExecNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 88e212c53a05a..877f6508d963f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -276,7 +276,7 @@ case class TakeOrderedAndProjectExec( sortOrder: Seq[SortOrder], projectList: Seq[NamedExpression], child: SparkPlan, - offset: Int = 0) extends AliasAwareOutputOrdering { + offset: Int = 0) extends OrderPreservingUnaryExecNode { override def output: Seq[Attribute] = { projectList.map(_.toAttribute) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 855741d37a9a2..e9cb77ec95c4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -1359,7 +1359,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { val numShuffles = collect(planned) { case e: ShuffleExchangeExec => e } - // before: numShuffles is 4 + // before SPARK-40086: numShuffles is 4 assert(numShuffles.size == 2) val numOutputPartitioning = collectFirst(planned) { case e: SortMergeJoinExec => e.outputPartitioning match { @@ -1371,77 +1371,6 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(numOutputPartitioning.size == 8) } } - - test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - multi-alias") { - Seq(0, 1, 2, 5).foreach { limit => - withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { - val df = spark.range(2).orderBy($"id").selectExpr("id as x", "id as y", "id as z") - val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering - limit match { - case 5 => - assert(outputOrdering.size == 1) - assert(outputOrdering.head.sameOrderExpressions.size == 2) - assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name) - .toSet.subsetOf(Set("x", "y", "z"))) - case 2 => - assert(outputOrdering.size == 1) - assert(outputOrdering.head.sameOrderExpressions.size == 1) - assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name) - .toSet.subsetOf(Set("x", "y", "z"))) - case 1 => - assert(outputOrdering.size == 1) - assert(outputOrdering.head.sameOrderExpressions.size == 0) - case 0 => - assert(outputOrdering.size == 0) - } - } - } - } - - test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - multi-alias") { - Seq(0, 1, 5).foreach { limit => - withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { - val df = spark.range(2).repartition($"id").selectExpr("id as x", "id as y", "id as z") - val outputPartitioning = stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning - limit match { - case 5 => - val p = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings - assert(p.size == 3) - assert(p.flatMap(_.asInstanceOf[HashPartitioning].expressions - .map(_.asInstanceOf[Attribute].name)).toSet == Set("x", "y", "z")) - case 1 => - val p = outputPartitioning.asInstanceOf[HashPartitioning] - assert(p.expressions.size == 1) - assert(p.expressions.map(_.asInstanceOf[Attribute].name) - .toSet.subsetOf(Set("x", "y", "z"))) - case 0 => - // the references of child output partitioning is not the subset of output, - // so it has been pruned - assert(outputPartitioning.isInstanceOf[UnknownPartitioning]) - } - } - } - } - - test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - multi-references") { - val df = spark.range(2).selectExpr("id as a", "id as b") - .orderBy($"a" + $"b").selectExpr("a as x", "b as y") - val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering - assert(outputOrdering.size == 1) - assert(outputOrdering.head.sameOrderExpressions.size == 0) - } - - test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - multi-references") { - val df = spark.range(2).selectExpr("id as a", "id as b") - .repartition($"a" + $"b").selectExpr("a as x", "b as y") - val outputPartitioning = stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning - // (a + b), (a + y), (x + b) are pruned since their references are not the subset of output - outputPartitioning match { - case HashPartitioning(Seq(Add(l: Attribute, r: Attribute, _)), _) => - assert(l.name == "x" && r.name == "y") - case _ => fail(s"Unexpected $outputPartitioning") - } - } } // Used for unit-testing EnsureRequirements diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala new file mode 100644 index 0000000000000..7eb7b2888274e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, PartitioningCollection, UnknownPartitioning} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +class ProjectedOrderingAndPartitioningSuite + extends SharedSparkSession with AdaptiveSparkPlanHelper { + import testImplicits._ + + test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - multi-alias") { + Seq(0, 1, 2, 5).foreach { limit => + withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { + val df = spark.range(2).orderBy($"id").selectExpr("id as x", "id as y", "id as z") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + limit match { + case 5 => + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sameOrderExpressions.size == 2) + assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name) + .toSet.subsetOf(Set("x", "y", "z"))) + case 2 => + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sameOrderExpressions.size == 1) + assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name) + .toSet.subsetOf(Set("x", "y", "z"))) + case 1 => + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sameOrderExpressions.size == 0) + case 0 => + assert(outputOrdering.size == 0) + } + } + } + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - multi-alias") { + Seq(0, 1, 5).foreach { limit => + withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { + val df = spark.range(2).repartition($"id").selectExpr("id as x", "id as y", "id as z") + val outputPartitioning = stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning + limit match { + case 5 => + val p = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + assert(p.size == 3) + assert(p.flatMap(_.asInstanceOf[HashPartitioning].expressions + .map(_.asInstanceOf[Attribute].name)).toSet == Set("x", "y", "z")) + case 1 => + val p = outputPartitioning.asInstanceOf[HashPartitioning] + assert(p.expressions.size == 1) + assert(p.expressions.map(_.asInstanceOf[Attribute].name) + .toSet.subsetOf(Set("x", "y", "z"))) + case 0 => + // the references of child output partitioning is not the subset of output, + // so it has been pruned + assert(outputPartitioning.isInstanceOf[UnknownPartitioning]) + } + } + } + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - multi-references") { + val df = spark.range(2).selectExpr("id as a", "id as b") + .orderBy($"a" + $"b").selectExpr("a as x", "b as y") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sameOrderExpressions.size == 0) + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - multi-references") { + val df = spark.range(2).selectExpr("id as a", "id as b") + .repartition($"a" + $"b").selectExpr("a as x", "b as y") + val outputPartitioning = stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning + // (a + b), (a + y), (x + b) are pruned since their references are not the subset of output + outputPartitioning match { + case HashPartitioning(Seq(Add(l: Attribute, r: Attribute, _)), _) => + assert(l.name == "x" && r.name == "y") + case _ => fail(s"Unexpected $outputPartitioning") + } + } +} From 3f84eddae2387ce75d6d44111ecc897408544c5f Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 20 Jan 2023 09:54:48 +0100 Subject: [PATCH 08/12] fix scala 2.13 compilation, add unique ordering calculation and new test --- .../plans/AliasAwareOutputExpression.scala | 12 +++++++----- .../ProjectedOrderingAndPartitioningSuite.scala | 14 ++++++++++++++ 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala index 35e173056ddd5..2511d5371bb14 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -71,7 +71,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper { expr.multiTransformDown { // Mapping with aliases case e: Expression if aliasMap.contains(e.canonicalized) => - aliasMap(e.canonicalized) ++ (if (e.containsChild.nonEmpty) Seq(e) else Seq.empty) + aliasMap(e.canonicalized).toSeq ++ (if (e.containsChild.nonEmpty) Seq(e) else Seq.empty) // Prune if we encounter an attribute that we can't map and it is not in output set. // This prune will go up to the closest `multiTransformDown()` call and returns `Stream.empty` @@ -97,12 +97,14 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]] override final def outputOrdering: Seq[SortOrder] = { if (hasAlias) { orderingExpressions.flatMap { sortOrder => - val equalOrderings = sortOrder.children.toStream + val orderingSet = mutable.Set.empty[Expression] + val sameOrderings = sortOrder.children.toStream .flatMap(projectExpression) + .filter(e => orderingSet.add(e.canonicalized)) .take(aliasCandidateLimit) - if (equalOrderings.nonEmpty) { - Some(sortOrder.copy(child = equalOrderings.head, - sameOrderExpressions = equalOrderings.tail)) + if (sameOrderings.nonEmpty) { + Some(sortOrder.copy(child = sameOrderings.head, + sameOrderExpressions = sameOrderings.tail)) } else { None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index 7eb7b2888274e..bf1850984a0b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -97,4 +97,18 @@ class ProjectedOrderingAndPartitioningSuite case _ => fail(s"Unexpected $outputPartitioning") } } + + test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to same") { + val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id as a", "id as b") + val outputPartitioning = stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning + val partitionings = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + // (a + b) is the very same as (b + a) so keep only one + assert(partitionings.size == 3) + + val df = spark.range(2).orderBy($"id" + $"id").selectExpr("id as a", "id as b") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering.size == 1) + // (a + b) is the very same as (b + a) so keep only one + assert(outputOrdering.head.children.size == 3) + } } From 7747045ae6a5ee5e2a429f0c3e18c6e901e34b79 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 26 Jan 2023 18:51:29 +0100 Subject: [PATCH 09/12] add some more tests --- ...rojectedOrderingAndPartitioningSuite.scala | 40 +++++++++++++++++-- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index bf1850984a0b6..3bf830a4a4b6f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -54,7 +54,7 @@ class ProjectedOrderingAndPartitioningSuite } test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - multi-alias") { - Seq(0, 1, 5).foreach { limit => + Seq(0, 1, 2, 5).foreach { limit => withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { val df = spark.range(2).repartition($"id").selectExpr("id as x", "id as y", "id as z") val outputPartitioning = stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning @@ -64,14 +64,17 @@ class ProjectedOrderingAndPartitioningSuite assert(p.size == 3) assert(p.flatMap(_.asInstanceOf[HashPartitioning].expressions .map(_.asInstanceOf[Attribute].name)).toSet == Set("x", "y", "z")) + case 2 => + val p = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + assert(p.size == 2) + p.flatMap(_.asInstanceOf[HashPartitioning].expressions + .map(_.asInstanceOf[Attribute].name)).toSet.subsetOf(Set("x", "y", "z")) case 1 => val p = outputPartitioning.asInstanceOf[HashPartitioning] assert(p.expressions.size == 1) assert(p.expressions.map(_.asInstanceOf[Attribute].name) .toSet.subsetOf(Set("x", "y", "z"))) case 0 => - // the references of child output partitioning is not the subset of output, - // so it has been pruned assert(outputPartitioning.isInstanceOf[UnknownPartitioning]) } } @@ -98,7 +101,21 @@ class ProjectedOrderingAndPartitioningSuite } } - test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to same") { + test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to complex " + + "expressions") { + val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id + id as a", "id + id as b") + val outputPartitioning = stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning + val partitionings = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + assert(partitionings.size == 2) + + val df = spark.range(2).orderBy($"id" + $"id").selectExpr("id + id as a", "id + id as b") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering.size == 1) + assert(outputOrdering.head.children.size == 2) + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to children of " + + "complex expressions") { val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id as a", "id as b") val outputPartitioning = stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning val partitionings = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings @@ -111,4 +128,19 @@ class ProjectedOrderingAndPartitioningSuite // (a + b) is the very same as (b + a) so keep only one assert(outputOrdering.head.children.size == 3) } + + test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to complex " + + "expressions and to their children") { + val df2 = spark.range(2).repartition($"id" + $"id") + .selectExpr("id + id as aa", "id + id as bb", "id as a", "id as b") + val outputPartitioning = stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning + val partitionings = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + assert(partitionings.size == 5) + + val df = spark.range(2).orderBy($"id" + $"id") + .selectExpr("id + id as aa", "id + id as bb", "id as a", "id as b") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering.size == 1) + assert(outputOrdering.head.children.size == 5) + } } From e6c395e72de5b92ae50ead904d53cf0f8471f2f1 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 30 Jan 2023 11:21:12 +0100 Subject: [PATCH 10/12] address review comments --- .../plans/AliasAwareOutputExpression.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 3 +- .../AliasAwareOutputExpression.scala | 17 +-------- .../aggregate/BaseAggregateExec.scala | 4 +- .../execution/basicPhysicalOperators.scala | 2 +- .../datasources/FileFormatWriter.scala | 1 - .../sql/execution/datasources/V1Writes.scala | 2 +- ...rojectedOrderingAndPartitioningSuite.scala | 38 +++++++++++++------ 8 files changed, 36 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala index 2511d5371bb14..68cf6f464d346 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -64,7 +64,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper { protected def hasAlias: Boolean = aliasMap.nonEmpty /** - * Return a set of Expression which normalize the original expression to the aliased. + * Return a stream of expressions in which the original expression is projected with `aliasMap`. */ protected def projectExpression(expr: Expression): Stream[Expression] = { val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c416f34074dec..0255c34d00c73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -437,9 +437,10 @@ object SQLConf { val EXPRESSION_PROJECTION_CANDIDATE_LIMIT = buildConf("spark.sql.optimizer.expressionProjectionCandidateLimit") - .doc("The maximum number of the candidate of out put expressions whose alias are replaced." + + .doc("The maximum number of the candidate of output expressions whose alias are replaced." + " It can preserve the output partitioning and ordering." + " Negative value means disable this optimization.") + .internal() .version("3.4.0") .intConf .createWithDefault(100) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index 1d973a6e408f9..a09c719cf840d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -26,31 +26,18 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC * A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that * satisfies distribution requirements. */ -trait AliasAwareOutputPartitioning extends UnaryExecNode +trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with AliasAwareOutputExpression { final override def outputPartitioning: Partitioning = { if (hasAlias) { flattenPartitioning(child.outputPartitioning).flatMap { - case p: PartitioningCollection => - val (exprPartitionings, nonExprPartitionings) = p.partitionings.partition { - case _: Expression => true - case _ => false - } + case e: Expression => // We need unique partitionings but if the input partitioning is // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after // the projection we have 4 partitionings: // `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`, // `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but // `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`. - val partitioningSet = mutable.Set.empty[Expression] - val projectedExprPartitionings = - exprPartitionings.asInstanceOf[Seq[Expression]].toStream - .flatMap(projectExpression) - .filter(e => partitioningSet.add(e.canonicalized)) - .take(aliasCandidateLimit) - .asInstanceOf[Stream[Partitioning]] - nonExprPartitionings ++ projectedExprPartitionings - case e: Expression => val partitioningSet = mutable.Set.empty[Expression] projectExpression(e) .filter(e => partitioningSet.add(e.canonicalized)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala index 756b5eb09d0b9..2427a39751f5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, NamedExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Final, PartialMerge} import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution} -import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, ExplainUtils, UnaryExecNode} +import org.apache.spark.sql.execution.{ExplainUtils, PartitioningPreservingUnaryExecNode, UnaryExecNode} import org.apache.spark.sql.execution.streaming.StatefulOperatorPartitioning /** * Holds common logic for aggregate operators */ -trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning { +trait BaseAggregateExec extends UnaryExecNode with PartitioningPreservingUnaryExecNode { def requiredChildDistributionExpressions: Option[Seq[Expression]] def isStreaming: Boolean def numShufflePartitions: Option[Int] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index ab696d69c92df..a7b1207765a43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryExecNode with CodegenSupport - with AliasAwareOutputPartitioning + with PartitioningPreservingUnaryExecNode with OrderPreservingUnaryExecNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 8efc52edbfd48..2491c9d7754e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -155,7 +155,6 @@ object FileFormatWriter extends Logging { } // the sort order doesn't matter - // Use the output ordering from the original plan before adding the empty2null projection. val actualOrdering = writeFilesOpt.map(_.child) .getOrElse(materializeAdaptiveSparkPlan(plan)) .outputOrdering diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index 76167b6004562..a4b76be4a6de1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -215,7 +215,7 @@ object V1WritesUtils { requiredOrdering.zip(outputOrdering).forall { case (requiredOrder, outputOrder) => // Follow `SortOrder.satisfies` that respects `SortOrder.sameOrderExpressions` - outputOrder.children.exists(_.semanticEquals(requiredOrder)) + outputOrder.satisfies(outputOrder.copy(child = requiredOrder)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index 3bf830a4a4b6f..b7d08e977d133 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.catalyst.expressions.{Add, Attribute} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, PartitioningCollection, UnknownPartitioning} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal.SQLConf @@ -86,6 +86,7 @@ class ProjectedOrderingAndPartitioningSuite .orderBy($"a" + $"b").selectExpr("a as x", "b as y") val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering assert(outputOrdering.size == 1) + assert(outputOrdering.head.sql == "(x + y) ASC NULLS FIRST") assert(outputOrdering.head.sameOrderExpressions.size == 0) } @@ -95,8 +96,7 @@ class ProjectedOrderingAndPartitioningSuite val outputPartitioning = stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning // (a + b), (a + y), (x + b) are pruned since their references are not the subset of output outputPartitioning match { - case HashPartitioning(Seq(Add(l: Attribute, r: Attribute, _)), _) => - assert(l.name == "x" && r.name == "y") + case p: HashPartitioning => assert(p.sql == "hashpartitioning((x + y))") case _ => fail(s"Unexpected $outputPartitioning") } } @@ -106,12 +106,16 @@ class ProjectedOrderingAndPartitioningSuite val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id + id as a", "id + id as b") val outputPartitioning = stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning val partitionings = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings - assert(partitionings.size == 2) + assert(partitionings.map { + case p: HashPartitioning => p.sql + case _ => fail(s"Unexpected $outputPartitioning") + } == Seq("hashpartitioning(b)", "hashpartitioning(a)")) val df = spark.range(2).orderBy($"id" + $"id").selectExpr("id + id as a", "id + id as b") val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering assert(outputOrdering.size == 1) - assert(outputOrdering.head.children.size == 2) + assert(outputOrdering.head.sql == "b ASC NULLS FIRST") + assert(outputOrdering.head.sameOrderExpressions.map(_.sql) == Seq("a")) } test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to children of " + @@ -119,14 +123,18 @@ class ProjectedOrderingAndPartitioningSuite val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id as a", "id as b") val outputPartitioning = stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning val partitionings = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings - // (a + b) is the very same as (b + a) so keep only one - assert(partitionings.size == 3) + // (a + b) is the same as (b + a) so expect only one + assert(partitionings.map { + case p: HashPartitioning => p.sql + case _ => fail(s"Unexpected $outputPartitioning") + } == Seq("hashpartitioning((b + b))", "hashpartitioning((a + b))", "hashpartitioning((a + a))")) val df = spark.range(2).orderBy($"id" + $"id").selectExpr("id as a", "id as b") val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering assert(outputOrdering.size == 1) - // (a + b) is the very same as (b + a) so keep only one - assert(outputOrdering.head.children.size == 3) + assert(outputOrdering.head.sql == "(b + b) ASC NULLS FIRST") + // (a + b) is the same as (b + a) so expect only one + assert(outputOrdering.head.sameOrderExpressions.map(_.sql) == Seq("(a + b)", "(a + a)")) } test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to complex " + @@ -135,12 +143,20 @@ class ProjectedOrderingAndPartitioningSuite .selectExpr("id + id as aa", "id + id as bb", "id as a", "id as b") val outputPartitioning = stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning val partitionings = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings - assert(partitionings.size == 5) + // (a + b) is the same as (b + a) so expect only one + assert(partitionings.map { + case p: HashPartitioning => p.sql + case _ => fail(s"Unexpected $outputPartitioning") + } == Seq("hashpartitioning(bb)", "hashpartitioning(aa)", "hashpartitioning((b + b))", + "hashpartitioning((a + b))", "hashpartitioning((a + a))")) val df = spark.range(2).orderBy($"id" + $"id") .selectExpr("id + id as aa", "id + id as bb", "id as a", "id as b") val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering assert(outputOrdering.size == 1) - assert(outputOrdering.head.children.size == 5) + assert(outputOrdering.head.sql == "bb ASC NULLS FIRST") + // (a + b) is the same as (b + a) so expect only one + assert(outputOrdering.head.sameOrderExpressions.map(_.sql) == + Seq("aa", "(b + b)", "(a + b)", "(a + a)")) } } From 733ecb555ca9ed5e520fbff764b5245a1cefbcf1 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 30 Jan 2023 15:11:23 +0100 Subject: [PATCH 11/12] remove comment --- .../org/apache/spark/sql/execution/datasources/V1Writes.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index a4b76be4a6de1..b17d72b0f7207 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -214,7 +214,6 @@ object V1WritesUtils { } else { requiredOrdering.zip(outputOrdering).forall { case (requiredOrder, outputOrder) => - // Follow `SortOrder.satisfies` that respects `SortOrder.sameOrderExpressions` outputOrder.satisfies(outputOrder.copy(child = requiredOrder)) } } From 1f1f093e646d1b45dfabc4aad07e8074fac87dcf Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 31 Jan 2023 11:57:35 +0100 Subject: [PATCH 12/12] fix strip usage, fix complex ordering projection --- .../plans/AliasAwareOutputExpression.scala | 10 +++++++--- ...ProjectedOrderingAndPartitioningSuite.scala | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala index 68cf6f464d346..4d9d69d14fe5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -44,7 +44,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper { val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]() outputExpressions.reverse.foreach { case a @ Alias(child, _) => - val buffer = aliases.getOrElseUpdate(strip(child.canonicalized), mutable.ArrayBuffer.empty) + val buffer = aliases.getOrElseUpdate(strip(child).canonicalized, mutable.ArrayBuffer.empty) if (buffer.size < aliasCandidateLimit) { buffer += a.toAttribute } @@ -96,7 +96,11 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]] override final def outputOrdering: Seq[SortOrder] = { if (hasAlias) { - orderingExpressions.flatMap { sortOrder => + // Take the first `SortOrder`s only until they can be projected. + // E.g. we have child ordering `Seq(SortOrder(a), SortOrder(b))` then + // if only `a AS x` can be projected then we can return Seq(SortOrder(x))` + // but if only `b AS y` can be projected we can't return `Seq(SortOrder(y))`. + orderingExpressions.iterator.map { sortOrder => val orderingSet = mutable.Set.empty[Expression] val sameOrderings = sortOrder.children.toStream .flatMap(projectExpression) @@ -108,7 +112,7 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]] } else { None } - } + }.takeWhile(_.isDefined).flatten.toSeq } else { orderingExpressions } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index b7d08e977d133..e4ecdb9c44595 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -159,4 +159,22 @@ class ProjectedOrderingAndPartitioningSuite assert(outputOrdering.head.sameOrderExpressions.map(_.sql) == Seq("aa", "(b + b)", "(a + b)", "(a + a)")) } + + test("SPARK-42049: Improve AliasAwareOutputExpression - ordering partly projected") { + val df = spark.range(2).orderBy($"id" + 1, $"id" + 2) + + val df1 = df.selectExpr("id + 1 AS a", "id + 2 AS b") + val outputOrdering1 = df1.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering1.size == 2) + assert(outputOrdering1.map(_.sql) == Seq("a ASC NULLS FIRST", "b ASC NULLS FIRST")) + + val df2 = df.selectExpr("id + 1 AS a") + val outputOrdering2 = df2.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering2.size == 1) + assert(outputOrdering2.head.sql == "a ASC NULLS FIRST") + + val df3 = df.selectExpr("id + 2 AS b") + val outputOrdering3 = df3.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering3.size == 0) + } }