Skip to content

Commit

Permalink
[SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning …
Browse files Browse the repository at this point in the history
…and AliasAwareQueryOutputOrdering to take all aliases into account

### What changes were proposed in this pull request?
Currently `AliasAwareOutputPartitioning` and `AliasAwareQueryOutputOrdering`
takes only the last alias by aliased expressions into account. We could avoid some extra shuffles and sorts with better alias handling.

### Why are the changes needed?
Performance improvement and this also fix the issue in #39475.

### Does this PR introduce _any_ user-facing change?
Yes, this PR fixes the issue in #39475.

### How was this patch tested?
Added new UT.

Closes #37525 from peter-toth/SPARK-40086-fix-aliasawareoutputexpression.

Lead-authored-by: Peter Toth <[email protected]>
Co-authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 6341b06)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
2 people authored and cloud-fan committed Jan 31, 2023
1 parent 4ab6e61 commit b122436
Show file tree
Hide file tree
Showing 19 changed files with 464 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3024,3 +3024,27 @@ case class SplitPart (
partNum = newChildren.apply(2))
}
}

/**
* A internal function that converts the empty string to null for partition values.
* This function should be only used in V1Writes.
*/
case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression {
override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v

override def nullable: Boolean = true

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
s"""if ($c.numBytes() == 0) {
| ${ev.isNull} = true;
| ${ev.value} = null;
|} else {
| ${ev.value} = $c;
|}""".stripMargin
})
}

override protected def withNewChildInternal(newChild: Expression): Empty2Null =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans

import scala.collection.mutable

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.internal.SQLConf

/**
* A trait that provides functionality to handle aliases in the `outputExpressions`.
*/
trait AliasAwareOutputExpression extends SQLConfHelper {
protected val aliasCandidateLimit = conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT)
protected def outputExpressions: Seq[NamedExpression]
/**
* This method can be used to strip expression which does not affect the result, for example:
* strip the expression which is ordering agnostic for output ordering.
*/
protected def strip(expr: Expression): Expression = expr

// Build an `Expression` -> `Attribute` alias map.
// There can be multiple alias defined for the same expressions but it doesn't make sense to store
// more than `aliasCandidateLimit` attributes for an expression. In those cases the old logic
// handled only the last alias so we need to make sure that we give precedence to that.
// If the `outputExpressions` contain simple attributes we need to add those too to the map.
private lazy val aliasMap = {
val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]()
outputExpressions.reverse.foreach {
case a @ Alias(child, _) =>
val buffer = aliases.getOrElseUpdate(strip(child).canonicalized, mutable.ArrayBuffer.empty)
if (buffer.size < aliasCandidateLimit) {
buffer += a.toAttribute
}
case _ =>
}
outputExpressions.foreach {
case a: Attribute if aliases.contains(a.canonicalized) =>
val buffer = aliases(a.canonicalized)
if (buffer.size < aliasCandidateLimit) {
buffer += a
}
case _ =>
}
aliases
}

protected def hasAlias: Boolean = aliasMap.nonEmpty

/**
* Return a stream of expressions in which the original expression is projected with `aliasMap`.
*/
protected def projectExpression(expr: Expression): Stream[Expression] = {
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
expr.multiTransformDown {
// Mapping with aliases
case e: Expression if aliasMap.contains(e.canonicalized) =>
aliasMap(e.canonicalized).toSeq ++ (if (e.containsChild.nonEmpty) Seq(e) else Seq.empty)

// Prune if we encounter an attribute that we can't map and it is not in output set.
// This prune will go up to the closest `multiTransformDown()` call and returns `Stream.empty`
// there.
case a: Attribute if !outputSet.contains(a) => Seq.empty
}
}
}

/**
* A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that
* satisfies ordering requirements.
*/
trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
extends AliasAwareOutputExpression { self: QueryPlan[T] =>
protected def orderingExpressions: Seq[SortOrder]

override protected def strip(expr: Expression): Expression = expr match {
case e: Empty2Null => strip(e.child)
case _ => expr
}

override final def outputOrdering: Seq[SortOrder] = {
if (hasAlias) {
// Take the first `SortOrder`s only until they can be projected.
// E.g. we have child ordering `Seq(SortOrder(a), SortOrder(b))` then
// if only `a AS x` can be projected then we can return Seq(SortOrder(x))`
// but if only `b AS y` can be projected we can't return `Seq(SortOrder(y))`.
orderingExpressions.iterator.map { sortOrder =>
val orderingSet = mutable.Set.empty[Expression]
val sameOrderings = sortOrder.children.toStream
.flatMap(projectExpression)
.filter(e => orderingSet.add(e.canonicalized))
.take(aliasCandidateLimit)
if (sameOrderings.nonEmpty) {
Some(sortOrder.copy(child = sameOrderings.head,
sameOrderExpressions = sameOrderings.tail))
} else {
None
}
}.takeWhile(_.isDefined).flatten.toSeq
} else {
orderingExpressions
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
@transient
lazy val outputSet: AttributeSet = AttributeSet(output)

/**
* Returns the output ordering that this plan generates, although the semantics differ in logical
* and physical plans. In the logical plan it means global ordering of the data while in physical
* it means ordering in each partition.
*/
def outputOrdering: Seq[SortOrder] = Nil

// Override `treePatternBits` to propagate bits for its expressions.
override lazy val treePatternBits: BitSet = {
val bits: BitSet = getDefaultTreePatternBits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.{AliasAwareQueryOutputOrdering, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -141,11 +141,6 @@ abstract class LogicalPlan
*/
def refresh(): Unit = children.foreach(_.refresh())

/**
* Returns the output ordering that this plan generates.
*/
def outputOrdering: Seq[SortOrder] = Nil

/**
* Returns true iff `other`'s output is semantically the same, i.e.:
* - it contains the same number of `Attribute`s;
Expand Down Expand Up @@ -205,8 +200,10 @@ trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] {
*/
trait BinaryNode extends LogicalPlan with BinaryLike[LogicalPlan]

abstract class OrderPreservingUnaryNode extends UnaryNode {
override final def outputOrdering: Seq[SortOrder] = child.outputOrdering
trait OrderPreservingUnaryNode extends UnaryNode
with AliasAwareQueryOutputOrdering[LogicalPlan] {
override protected def outputExpressions: Seq[NamedExpression] = child.output
override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering
}

object LogicalPlanIntegrity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ object Subquery {
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override protected def outputExpressions: Seq[NamedExpression] = projectList
override def maxRows: Option[Long] = child.maxRows
override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val EXPRESSION_PROJECTION_CANDIDATE_LIMIT =
buildConf("spark.sql.optimizer.expressionProjectionCandidateLimit")
.doc("The maximum number of the candidate of output expressions whose alias are replaced." +
" It can preserve the output partitioning and ordering." +
" Negative value means disable this optimization.")
.internal()
.version("3.4.0")
.intConf
.createWithDefault(100)

val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
"column based on statistics of the data.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,42 @@
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning}
import scala.collection.mutable

/**
* A trait that provides functionality to handle aliases in the `outputExpressions`.
*/
trait AliasAwareOutputExpression extends UnaryExecNode {
protected def outputExpressions: Seq[NamedExpression]

private lazy val aliasMap = outputExpressions.collect {
case a @ Alias(child, _) => child.canonicalized -> a.toAttribute
}.toMap

protected def hasAlias: Boolean = aliasMap.nonEmpty

protected def normalizeExpression(exp: Expression): Expression = {
exp.transformDown {
case e: Expression => aliasMap.getOrElse(e.canonicalized, e)
}
}
}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning}

/**
* A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that
* satisfies distribution requirements.
*/
trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
with AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
val normalizedOutputPartitioning = if (hasAlias) {
child.outputPartitioning match {
if (hasAlias) {
flattenPartitioning(child.outputPartitioning).flatMap {
case e: Expression =>
normalizeExpression(e).asInstanceOf[Partitioning]
case other => other
// We need unique partitionings but if the input partitioning is
// `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after
// the projection we have 4 partitionings:
// `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`,
// `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but
// `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`.
val partitioningSet = mutable.Set.empty[Expression]
projectExpression(e)
.filter(e => partitioningSet.add(e.canonicalized))
.take(aliasCandidateLimit)
.asInstanceOf[Stream[Partitioning]]
case o => Seq(o)
} match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(p) => p
case ps => PartitioningCollection(ps)
}
} else {
child.outputPartitioning
}

flattenPartitioning(normalizedOutputPartitioning).filter {
case hashPartitioning: HashPartitioning => hashPartitioning.references.subsetOf(outputSet)
case _ => true
} match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(singlePartitioning) => singlePartitioning
case seqWithMultiplePartitionings => PartitioningCollection(seqWithMultiplePartitionings)
}
}

private def flattenPartitioning(partitioning: Partitioning): Seq[Partitioning] = {
Expand All @@ -74,18 +64,5 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
}
}

/**
* A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that
* satisfies ordering requirements.
*/
trait AliasAwareOutputOrdering extends AliasAwareOutputExpression {
protected def orderingExpressions: Seq[SortOrder]

final override def outputOrdering: Seq[SortOrder] = {
if (hasAlias) {
orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder])
} else {
orderingExpressions
}
}
}
trait OrderPreservingUnaryExecNode
extends UnaryExecNode with AliasAwareQueryOutputOrdering[SparkPlan]
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
def requiredChildDistribution: Seq[Distribution] =
Seq.fill(children.size)(UnspecifiedDistribution)

/** Specifies how data is ordered in each partition. */
def outputOrdering: Seq[SortOrder] = Nil

/** Specifies sort order for each partition requirements on the input data for this operator. */
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Final, PartialMerge}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, ExplainUtils, UnaryExecNode}
import org.apache.spark.sql.execution.{ExplainUtils, PartitioningPreservingUnaryExecNode, UnaryExecNode}
import org.apache.spark.sql.execution.streaming.StatefulOperatorPartitioning

/**
* Holds common logic for aggregate operators
*/
trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning {
trait BaseAggregateExec extends UnaryExecNode with PartitioningPreservingUnaryExecNode {
def requiredChildDistributionExpressions: Option[Seq[Expression]]
def isStreaming: Boolean
def numShufflePartitions: Option[Int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{AliasAwareOutputOrdering, SparkPlan}
import org.apache.spark.sql.execution.{OrderPreservingUnaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -41,7 +41,7 @@ case class SortAggregateExec(
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends AggregateCodegenSupport
with AliasAwareOutputOrdering {
with OrderPreservingUnaryExecNode {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryExecNode
with CodegenSupport
with AliasAwareOutputPartitioning
with AliasAwareOutputOrdering {
with PartitioningPreservingUnaryExecNode
with OrderPreservingUnaryExecNode {

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,9 @@ object FileFormatWriter extends Logging {
}

// the sort order doesn't matter
// Use the output ordering from the original plan before adding the empty2null projection.
val actualOrdering = writeFilesOpt.map(_.child)
.getOrElse(materializeAdaptiveSparkPlan(plan))
.outputOrdering.map(_.child)
.outputOrdering
val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering)

SQLExecution.checkSQLExecutionId(sparkSession)
Expand Down
Loading

0 comments on commit b122436

Please sign in to comment.