From 1d1eacde9d1b6fb75a20e4b909d221e70ad737db Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 9 Jun 2020 16:07:22 +0000 Subject: [PATCH] [SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled ### What changes were proposed in this pull request? This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](https://github.com/apache/spark/blob/af4248b2d661d04fec89b37857a47713246d9465/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L446-L455) when adaptive execution enabled. ### Why are the changes needed? To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number. How to reproduce: ```scala spark.sql("CREATE TABLE spark_31220(id int)") spark.sql("set spark.sql.adaptive.enabled=true") spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000") ``` Before this PR: ``` scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- HashAggregate(keys=[id#5], functions=[]) +- Exchange hashpartitioning(id#5, 1000), true, [id=#171] +- HashAggregate(keys=[id#5], functions=[]) +- FileScan parquet default.spark_31220[id#5] scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Exchange hashpartitioning(id#5, 200), false, [id=#179] +- FileScan parquet default.spark_31220[id#5] ``` After this PR: ``` scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- HashAggregate(keys=[id#5], functions=[]) +- Exchange hashpartitioning(id#5, 1000), true, [id=#171] +- HashAggregate(keys=[id#5], functions=[]) +- FileScan parquet default.spark_31220[id#5] scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Exchange hashpartitioning(id#5, 1000), false, [id=#179] +- FileScan parquet default.spark_31220[id#5] ``` ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit test. Closes #27986 from wangyum/SPARK-31220. Authored-by: Yuming Wang Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/internal/SQLConf.scala | 13 +++++++++---- .../execution/exchange/EnsureRequirements.scala | 10 ++-------- .../adaptive/AdaptiveQueryExecSuite.scala | 16 ++++++++++++++++ 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3a41b0553db54..189740e313207 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2784,7 +2784,15 @@ class SQLConf extends Serializable with Logging { def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) - def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) + def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) + + def numShufflePartitions: Int = { + if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) { + getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions) + } else { + defaultNumShufflePartitions + } + } def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) @@ -2797,9 +2805,6 @@ class SQLConf extends Serializable with Logging { def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED) - def initialShufflePartitionNum: Int = - getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions) - def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 28ef793ed62db..3242ac21ab324 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -35,12 +35,6 @@ import org.apache.spark.sql.internal.SQLConf * the input partition ordering requirements are met. */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { - private def defaultNumPreShufflePartitions: Int = - if (conf.adaptiveExecutionEnabled && conf.coalesceShufflePartitionsEnabled) { - conf.initialShufflePartitionNum - } else { - conf.numShufflePartitions - } private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution @@ -57,7 +51,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { BroadcastExchangeExec(mode, child) case (child, distribution) => val numPartitions = distribution.requiredNumPartitions - .getOrElse(defaultNumPreShufflePartitions) + .getOrElse(conf.numShufflePartitions) ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) } @@ -95,7 +89,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { // expected number of shuffle partitions. However, if it's smaller than // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the // expected number of shuffle partitions. - math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions) + math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions) } else { childrenNumPartitions.max } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 3d0ba05f76b71..9fa97bffa8910 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1021,4 +1021,20 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") { + Seq(true, false).foreach { enableAQE => + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "6", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { + val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length + if (enableAQE) { + assert(partitionsNum === 7) + } else { + assert(partitionsNum === 6) + } + } + } + } }