diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 31b19a57936ae..a3fc39e7c7e70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -222,6 +222,8 @@ case class AdaptiveSparkPlanExec( .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) } + def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity) + private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { if (isFinalPlan) return currentPhysicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 643902e7cbcb2..02af79ea2bbb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType import org.apache.spark.unsafe.types.UTF8String @@ -187,8 +188,17 @@ object FileFormatWriter extends Logging { // We should first sort by partition columns, then bucket id, and finally sorting columns. val requiredOrdering = partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns + + // SPARK-40588: plan may contain an AdaptiveSparkPlanExec, which does not know + // its final plan's ordering, so we have to materialize that plan first + def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan + case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan)) + } + val materializedPlan = materializeAdaptiveSparkPlan(empty2NullPlan) + // the sort order doesn't matter - val actualOrdering = empty2NullPlan.outputOrdering.map(_.child) + val actualOrdering = materializedPlan.outputOrdering.map(_.child) val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { false } else { @@ -210,7 +220,7 @@ object FileFormatWriter extends Logging { try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { - (empty2NullPlan.execute(), None) + (materializedPlan.execute(), None) } else { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and // the physical plan may have different attribute ids due to optimizer removing some @@ -220,12 +230,12 @@ object FileFormatWriter extends Logging { val sortPlan = SortExec( orderingExpr, global = false, - child = empty2NullPlan) + child = materializedPlan) val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty if (concurrentWritersEnabled) { - (empty2NullPlan.execute(), + (materializedPlan.execute(), Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))) } else { (sortPlan.execute(), None)