Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40588] FileFormatWriter materializes AQE plan before accessing outputOrdering #38358

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ case class AdaptiveSparkPlanExec(
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
}

def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity)

private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
if (isFinalPlan) return currentPhysicalPlan

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -187,8 +188,17 @@ object FileFormatWriter extends Logging {
// We should first sort by partition columns, then bucket id, and finally sorting columns.
val requiredOrdering =
partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns

// SPARK-40588: plan may contain an AdaptiveSparkPlanExec, which does not know
// its final plan's ordering, so we have to materialize that plan first
def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {

This comment was marked as resolved.

Copy link
Contributor Author

@EnricoMi EnricoMi Oct 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean with "is brought"` in "the missing ordering field of Sort in the top layer is also brought"?

It is also not clear to me what you are referring to with "removing 'AdaptiveSparkPlanExec'". Do you mean the change in this this PR? The 'AdaptiveSparkPlanExec' is not removed but replaced with the final plan. It is the same plan that is otherwise used when calling .execute, so AQE should work as before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V1WriteCommand is a concept introduced after Spark 3.3. The issue addressed in this PR does not exist post Spark 3.3 as the FieFormatWriter does not see AdaptiveSparkPlanExec but the final plan (exactly what this change is trying to achieve).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you mean. The problem of #38356 is also caused by enforces the ordering at V1WriteCommand, but not the same case.

case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan))
}
val materializedPlan = materializeAdaptiveSparkPlan(empty2NullPlan)

// the sort order doesn't matter
val actualOrdering = empty2NullPlan.outputOrdering.map(_.child)
val actualOrdering = materializedPlan.outputOrdering.map(_.child)
val orderingMatched = if (requiredOrdering.length > actualOrdering.length) {
false
} else {
Expand All @@ -210,7 +220,7 @@ object FileFormatWriter extends Logging {

try {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
(empty2NullPlan.execute(), None)
(materializedPlan.execute(), None)
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
Expand All @@ -220,12 +230,12 @@ object FileFormatWriter extends Logging {
val sortPlan = SortExec(
orderingExpr,
global = false,
child = empty2NullPlan)
child = materializedPlan)

val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters
val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty
if (concurrentWritersEnabled) {
(empty2NullPlan.execute(),
(materializedPlan.execute(),
Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())))
} else {
(sortPlan.execute(), None)
Expand Down