From 701f1c30bbca2bfdffd121cc9c0d6b4a5250f823 Mon Sep 17 00:00:00 2001 From: jiake Date: Tue, 11 May 2021 17:43:19 +0800 Subject: [PATCH] resolve the comments --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 12 ++++++------ .../adaptive/PlanAdaptiveDynamicPruningFilters.scala | 5 +++-- .../spark/sql/DynamicPartitionPruningSuite.scala | 9 ++++----- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 94d4ee1353956..256aacb9049d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -91,15 +91,10 @@ case class AdaptiveSparkPlanExec( DisableUnnecessaryBucketedScan ) ++ context.session.sessionState.queryStagePrepRules - @transient private val initialPlan = context.session.withActive { - applyPhysicalRules( - inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations"))) - } - // A list of physical optimizer rules to be applied to a new stage before its execution. These // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( - PlanAdaptiveDynamicPruningFilters(initialPlan), + PlanAdaptiveDynamicPruningFilters(this), ReuseAdaptiveSubquery(context.subqueryCache), CoalesceShufflePartitions(context.session), // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs' @@ -134,6 +129,11 @@ case class AdaptiveSparkPlanExec( @transient private val costEvaluator = SimpleCostEvaluator + @transient private val initialPlan = context.session.withActive { + applyPhysicalRules( + inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations"))) + } + @volatile private var currentPhysicalPlan = initialPlan private var isFinalPlan = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala index e42d53a232557..3ef18108d2b30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala @@ -27,7 +27,8 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati /** * A rule to insert dynamic pruning predicates in order to reuse the results of broadcast. */ -case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[SparkPlan] { +case class PlanAdaptiveDynamicPruningFilters( + rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper { def apply(plan: SparkPlan): SparkPlan = { if (!conf.dynamicPartitionPruningEnabled) { return plan @@ -44,7 +45,7 @@ case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[S val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan) val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty && - rootPlan.find { + find(rootPlan) { case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) => left.sameResult(exchange) case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index b69a683b5e75e..3b88bd58d925f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1492,12 +1492,11 @@ abstract class DynamicPartitionPruningSuiteBase // +- Exchange // +- HashAggregate // +- Filter - // +- FileScan [PartitionFilters: [isnotnull(store_id#3367), - // dynamicpruningexpression(store_id#3367 IN dynamicpruning#3385)]] + // +- FileScan [PartitionFilters: dynamicpruning#3385] // +- SubqueryBroadcast dynamicpruning#3385 - // +- AdaptiveSparkPlan - // +- BroadcastQueryStage - // +- BroadcastExchange + // +- AdaptiveSparkPlan + // +- BroadcastQueryStage + // +- BroadcastExchange // // +- BroadcastQueryStage // +- ReusedExchange