Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all aliases into account #37525

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -3039,3 +3039,27 @@ case class SplitPart (
partNum = newChildren.apply(2))
}
}

/**
* A internal function that converts the empty string to null for partition values.
* This function should be only used in V1Writes.
*/
case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression {
override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v

override def nullable: Boolean = true

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
s"""if ($c.numBytes() == 0) {
| ${ev.isNull} = true;
| ${ev.value} = null;
|} else {
| ${ev.value} = $c;
|}""".stripMargin
})
}

override protected def withNewChildInternal(newChild: Expression): Empty2Null =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans

import scala.collection.mutable

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.internal.SQLConf

/**
* A trait that provides functionality to handle aliases in the `outputExpressions`.
*/
trait AliasAwareOutputExpression extends SQLConfHelper {
protected val aliasCandidateLimit = conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT)
protected def outputExpressions: Seq[NamedExpression]
/**
* This method can be used to strip expression which does not affect the result, for example:
* strip the expression which is ordering agnostic for output ordering.
*/
protected def strip(expr: Expression): Expression = expr

// Build an `Expression` -> `Attribute` alias map.
// There can be multiple alias defined for the same expressions but it doesn't make sense to store
// more than `aliasCandidateLimit` attributes for an expression. In those cases the old logic
// handled only the last alias so we need to make sure that we give precedence to that.
// If the `outputExpressions` contain simple attributes we need to add those too to the map.
private lazy val aliasMap = {
val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we limit the size of this map as well? Now it only limits the size of each map value.

Copy link
Contributor Author

@peter-toth peter-toth Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should as it might happen that we reach the map size limit but the added aliases don't project the child's partitioning / ordering. E.g. if child partitioning is HashPartitioning(c) and the projection is c1 as c1a, ..., c100 as c100a, c as ca then c as ca is not added to the map so we would end up with UnknownPartitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current config acutally limit the size of final preserved exprs. I think we should add one more config to limit the candidates size in aliasMap and it can be bigger by default. This aliasMap may harm driver memory for wide tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok to add a new config but aliasMap will never be bigger than the projection (outputExpressions) so is this a real concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave it for now. Having less Attribute in map values may generate fewer alternatives, but having less map entries may stop the entire projection.

outputExpressions.reverse.foreach {
case a @ Alias(child, _) =>
val buffer = aliases.getOrElseUpdate(strip(child.canonicalized), mutable.ArrayBuffer.empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer strip(child).canonicalized. I have not seen other code places that match a canonicalized expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks. I will change it today.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 1f1f093

if (buffer.size < aliasCandidateLimit) {
buffer += a.toAttribute
}
case _ =>
}
outputExpressions.foreach {
case a: Attribute if aliases.contains(a.canonicalized) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do ? do you mean !aliases.contains(a.canonicalized) ?

Copy link
Contributor Author

@peter-toth peter-toth Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have c, c AS a projection then we need to add both the original c attribute and a to the alternatives of c. But we don't need to add an attribute if it isn't aliased.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the behavior of c as a ? this code seems to return both c and a. I think the right way should be
if AttributSet(outputExpressions).contains(a) => // add a to buffer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case doesn't match c AS a. This case makes sure that if c -> a has been added to the map by the previous forach then c -> c should be added too.

I think your code would add all attributes, but that is not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why c -> c should be added ? the outputExpression only contains c as a, shall we only return c -> a ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see it. how about this case: c1, c2 as x ? We also should add c1 into aliasMap right ?

Copy link
Contributor Author

@peter-toth peter-toth Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. If c1 is not aliased otherwise then we don't need to add it to the map. If the map doesn't contain any alias for an expression then the transformation does nothing with that c1 attribute, just leaves it in the expression tree as it is...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the aliasMap is not empty due to c2 as x. for this case, how can we preserve c1 if c1 does not add into aliasMap ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see it, nvm. @peter-toth Thank you for the patience !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Np, thanks for reviewing this PR @ulysses-you!

val buffer = aliases(a.canonicalized)
if (buffer.size < aliasCandidateLimit) {
buffer += a
}
case _ =>
}
aliases
}

protected def hasAlias: Boolean = aliasMap.nonEmpty

/**
* Return a stream of expressions in which the original expression is projected with `aliasMap`.
*/
protected def projectExpression(expr: Expression): Stream[Expression] = {
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
expr.multiTransformDown {
// Mapping with aliases
case e: Expression if aliasMap.contains(e.canonicalized) =>
aliasMap(e.canonicalized).toSeq ++ (if (e.containsChild.nonEmpty) Seq(e) else Seq.empty)

// Prune if we encounter an attribute that we can't map and it is not in output set.
// This prune will go up to the closest `multiTransformDown()` call and returns `Stream.empty`
// there.
case a: Attribute if !outputSet.contains(a) => Seq.empty
}
}
}

/**
* A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that
* satisfies ordering requirements.
*/
trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
extends AliasAwareOutputExpression { self: QueryPlan[T] =>
protected def orderingExpressions: Seq[SortOrder]

override protected def strip(expr: Expression): Expression = expr match {
case e: Empty2Null => strip(e.child)
case _ => expr
}

override final def outputOrdering: Seq[SortOrder] = {
if (hasAlias) {
orderingExpressions.flatMap { sortOrder =>
val orderingSet = mutable.Set.empty[Expression]
val sameOrderings = sortOrder.children.toStream
.flatMap(projectExpression)
.filter(e => orderingSet.add(e.canonicalized))
.take(aliasCandidateLimit)
if (sameOrderings.nonEmpty) {
Some(sortOrder.copy(child = sameOrderings.head,
sameOrderExpressions = sameOrderings.tail))
} else {
None
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct ? if orderingExpressions is c1, c2 and aliasMap is c2 as x, then the outputOrdering should be c1, x or empty rather than x.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Partitioning is fine as the entire HashPartitioning is one expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add tests for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get this. If we have 2 orderings in orderingExpressions (SortOrder (c1), SortOrder(c2)) and projection is c2 as x then why we don't expect SortOrder(x)?

Copy link
Contributor Author

@peter-toth peter-toth Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I might see the issue here... Let me fix it today.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If data is sorted by a, b, we can't say the data is sorted by b, but we can say it's sorted by a.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it already thanks! I will submit a fix today.

Copy link
Contributor Author

@peter-toth peter-toth Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the fix and a new test: 1f1f093

}
} else {
orderingExpressions
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
@transient
lazy val outputSet: AttributeSet = AttributeSet(output)

/**
* Returns the output ordering that this plan generates, although the semantics differ in logical
* and physical plans. In the logical plan it means global ordering of the data while in physical
* it means ordering in each partition.
*/
def outputOrdering: Seq[SortOrder] = Nil

// Override `treePatternBits` to propagate bits for its expressions.
override lazy val treePatternBits: BitSet = {
val bits: BitSet = getDefaultTreePatternBits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.{AliasAwareQueryOutputOrdering, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -141,11 +141,6 @@ abstract class LogicalPlan
*/
def refresh(): Unit = children.foreach(_.refresh())

/**
* Returns the output ordering that this plan generates.
*/
def outputOrdering: Seq[SortOrder] = Nil

/**
* Returns true iff `other`'s output is semantically the same, i.e.:
* - it contains the same number of `Attribute`s;
Expand Down Expand Up @@ -205,8 +200,10 @@ trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] {
*/
trait BinaryNode extends LogicalPlan with BinaryLike[LogicalPlan]

abstract class OrderPreservingUnaryNode extends UnaryNode {
override final def outputOrdering: Seq[SortOrder] = child.outputOrdering
trait OrderPreservingUnaryNode extends UnaryNode
with AliasAwareQueryOutputOrdering[LogicalPlan] {
override protected def outputExpressions: Seq[NamedExpression] = child.output
override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering
}

object LogicalPlanIntegrity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ object Subquery {
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override protected def outputExpressions: Seq[NamedExpression] = projectList
override def maxRows: Option[Long] = child.maxRows
override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val EXPRESSION_PROJECTION_CANDIDATE_LIMIT =
buildConf("spark.sql.optimizer.expressionProjectionCandidateLimit")
.doc("The maximum number of the candidate of output expressions whose alias are replaced." +
" It can preserve the output partitioning and ordering." +
" Negative value means disable this optimization.")
.internal()
.version("3.4.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR targets master, which is 3.5.0. Is this going to be merged into branch-3.4, which is feature-freeze? If not, this line should be adjusted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to merge it to 3.4 as it fixes a bug in planned write, which is a new feature in 3.4.

.intConf
.createWithDefault(100)

val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
"column based on statistics of the data.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,42 @@
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning}
import scala.collection.mutable

/**
* A trait that provides functionality to handle aliases in the `outputExpressions`.
*/
trait AliasAwareOutputExpression extends UnaryExecNode {
protected def outputExpressions: Seq[NamedExpression]

private lazy val aliasMap = outputExpressions.collect {
case a @ Alias(child, _) => child.canonicalized -> a.toAttribute
}.toMap

protected def hasAlias: Boolean = aliasMap.nonEmpty

protected def normalizeExpression(exp: Expression): Expression = {
exp.transformDown {
case e: Expression => aliasMap.getOrElse(e.canonicalized, e)
}
}
}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning}

/**
* A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that
* satisfies distribution requirements.
*/
trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
with AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
val normalizedOutputPartitioning = if (hasAlias) {
child.outputPartitioning match {
if (hasAlias) {
flattenPartitioning(child.outputPartitioning).flatMap {
case e: Expression =>
normalizeExpression(e).asInstanceOf[Partitioning]
case other => other
// We need unique partitionings but if the input partitioning is
// `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after
// the projection we have 4 partitionings:
// `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`,
// `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but
// `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`.
val partitioningSet = mutable.Set.empty[Expression]
projectExpression(e)
.filter(e => partitioningSet.add(e.canonicalized))
.take(aliasCandidateLimit)
Comment on lines +41 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala 2.13 allows to simplify this. Its a shame...

Suggested change
val partitioningSet = mutable.Set.empty[Expression]
projectExpression(e)
.filter(e => partitioningSet.add(e.canonicalized))
.take(aliasCandidateLimit)
projectExpression(e)
.distinctBy(_.canonicalized)
.take(aliasCandidateLimit)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I think we still need to support scala 2.12 for now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, that's the shame bit

.asInstanceOf[Stream[Partitioning]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to cast to Stream? It will be pattern matched by Seq(...) immediately which will materialize it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cast is required to avoid a compile error as projectExpression returns Stream[Expression] but the flatMap requires Seq[Partitioning].
We could use .asInstanceOf[Seq[Partitioning]] here but I'm not sure it makes any difference.

case o => Seq(o)
} match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(p) => p
case ps => PartitioningCollection(ps)
}
} else {
child.outputPartitioning
}

flattenPartitioning(normalizedOutputPartitioning).filter {
case hashPartitioning: HashPartitioning => hashPartitioning.references.subsetOf(outputSet)
case _ => true
} match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(singlePartitioning) => singlePartitioning
case seqWithMultiplePartitionings => PartitioningCollection(seqWithMultiplePartitionings)
}
}

private def flattenPartitioning(partitioning: Partitioning): Seq[Partitioning] = {
Expand All @@ -74,18 +64,5 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
}
}

/**
* A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that
* satisfies ordering requirements.
*/
trait AliasAwareOutputOrdering extends AliasAwareOutputExpression {
protected def orderingExpressions: Seq[SortOrder]

final override def outputOrdering: Seq[SortOrder] = {
if (hasAlias) {
orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder])
} else {
orderingExpressions
}
}
}
trait OrderPreservingUnaryExecNode
extends UnaryExecNode with AliasAwareQueryOutputOrdering[SparkPlan]
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
def requiredChildDistribution: Seq[Distribution] =
Seq.fill(children.size)(UnspecifiedDistribution)

/** Specifies how data is ordered in each partition. */
def outputOrdering: Seq[SortOrder] = Nil

/** Specifies sort order for each partition requirements on the input data for this operator. */
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Final, PartialMerge}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, ExplainUtils, UnaryExecNode}
import org.apache.spark.sql.execution.{ExplainUtils, PartitioningPreservingUnaryExecNode, UnaryExecNode}
import org.apache.spark.sql.execution.streaming.StatefulOperatorPartitioning

/**
* Holds common logic for aggregate operators
*/
trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning {
trait BaseAggregateExec extends UnaryExecNode with PartitioningPreservingUnaryExecNode {
def requiredChildDistributionExpressions: Option[Seq[Expression]]
def isStreaming: Boolean
def numShufflePartitions: Option[Int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{AliasAwareOutputOrdering, SparkPlan}
import org.apache.spark.sql.execution.{OrderPreservingUnaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -41,7 +41,7 @@ case class SortAggregateExec(
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends AggregateCodegenSupport
with AliasAwareOutputOrdering {
with OrderPreservingUnaryExecNode {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryExecNode
with CodegenSupport
with AliasAwareOutputPartitioning
with AliasAwareOutputOrdering {
with PartitioningPreservingUnaryExecNode
with OrderPreservingUnaryExecNode {

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,9 @@ object FileFormatWriter extends Logging {
}

// the sort order doesn't matter
// Use the output ordering from the original plan before adding the empty2null projection.
val actualOrdering = writeFilesOpt.map(_.child)
.getOrElse(materializeAdaptiveSparkPlan(plan))
.outputOrdering.map(_.child)
.outputOrdering
val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering)

SQLExecution.checkSQLExecutionId(sparkSession)
Expand Down
Loading