From f0f83b518d0bbfccd4ff6414d372bc2ad236f0ff Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Wed, 9 Nov 2022 15:59:54 +0800 Subject: [PATCH] [SPARK-40588] FileFormatWriter materializes AQE plan before accessing outputOrdering The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because `FileFormatWriter` gets the final plan. `FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering (Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588). This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4. The final plan that is written to files cannot be extracted from `FileFormatWriter`. The bug explained in [SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario. Therefore, this was tested manually. The [example to reproduce this issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032) given in SPARK-40588 now produces sorted files. The actual plan written into the files changed from ``` Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0 +- AdaptiveSparkPlan isFinalPlan=false +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=#30] +- BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [id=#28] : +- Project [id#0L AS day#2L] : +- Range (0, 2, step=1, splits=2) +- Range (0, 10000000, step=1, splits=2) ``` where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0`, to ``` *(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=#68] +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastQueryStage 0 : +- BroadcastExchange IdentityBroadcastMode, [id=#42] : +- *(1) Project [id#0L AS day#2L] : +- *(1) Range (0, 2, step=1, splits=2) +- *(2) Range (0, 1000000, step=1, splits=2) ``` where the sort given by the user is the outermost sort now. Closes #38358 from EnricoMi/branch-3.3-materialize-aqe-plan. Authored-by: Enrico Minack Signed-off-by: Wenchen Fan (cherry picked from commit f0cad7ad6c2618d2d0d8c8598bbd54c2ca366b6b) Signed-off-by: Wenchen Fan --- .../adaptive/AdaptiveSparkPlanExec.scala | 2 ++ .../datasources/FileFormatWriter.scala | 18 ++++++++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 5157c169ef923..1e7cfc474c1fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -217,6 +217,8 @@ case class AdaptiveSparkPlanExec( .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) } + def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity) + private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { if (isFinalPlan) return currentPhysicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index cd3d101ac26e5..a9d4d4208f34c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType import org.apache.spark.unsafe.types.UTF8String @@ -160,8 +161,17 @@ object FileFormatWriter extends Logging { // We should first sort by partition columns, then bucket id, and finally sorting columns. val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns + + // SPARK-40588: plan may contain an AdaptiveSparkPlanExec, which does not know + // its final plan's ordering, so we have to materialize that plan first + def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan + case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan)) + } + val materializedPlan = materializeAdaptiveSparkPlan(empty2NullPlan) + // the sort order doesn't matter - val actualOrdering = empty2NullPlan.outputOrdering.map(_.child) + val actualOrdering = materializedPlan.outputOrdering.map(_.child) val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { false } else { @@ -183,7 +193,7 @@ object FileFormatWriter extends Logging { try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { - (empty2NullPlan.execute(), None) + (materializedPlan.execute(), None) } else { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and // the physical plan may have different attribute ids due to optimizer removing some @@ -193,12 +203,12 @@ object FileFormatWriter extends Logging { val sortPlan = SortExec( orderingExpr, global = false, - child = empty2NullPlan) + child = materializedPlan) val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty if (concurrentWritersEnabled) { - (empty2NullPlan.execute(), + (materializedPlan.execute(), Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))) } else { (sortPlan.execute(), None)