From f38c92b7e065e4dba0a2ed10546eb7122532602e Mon Sep 17 00:00:00 2001 From: Will Raschkowski Date: Tue, 28 Apr 2020 14:51:11 +0200 Subject: [PATCH 1/6] Make estimatePartitionStartIndices static and pass advisory size and min partitions as parameters --- .../adaptive/ReduceNumShufflePartitions.scala | 19 ++-- .../ReduceNumShufflePartitionsSuite.scala | 90 +++++++++---------- 2 files changed, 55 insertions(+), 54 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index 1a85d5c02075b..f34ce52b1d57f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration.Duration import org.apache.spark.MapOutputStatistics +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -88,7 +89,10 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { - val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray) + val partitionStartIndices = estimatePartitionStartIndices( + validMetrics.toArray, + conf.targetPostShuffleInputSize, + conf.minNumPostShufflePartitions) // This transformation adds new nodes, so we must use `transformUp` here. plan.transformUp { // even for shuffle exchange whose input RDD has 0 partition, we should still update its @@ -102,19 +106,20 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { } } } +} +object ReduceNumShufflePartitions extends Logging { /** * Estimates partition start indices for post-shuffle partitions based on * mapOutputStatistics provided by all pre-shuffle stages. */ // visible for testing. private[sql] def estimatePartitionStartIndices( - mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = { - val minNumPostShufflePartitions = conf.minNumPostShufflePartitions - val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize - // If minNumPostShufflePartitions is defined, it is possible that we need to use a - // value less than advisoryTargetPostShuffleInputSize as the target input size of - // a post shuffle task. + mapOutputStatistics: Array[MapOutputStatistics], + advisoryTargetPostShuffleInputSize: Long, + minNumPostShufflePartitions: Int): Array[Int] = { + // If `minNumPostShufflePartitions` is very large, it is possible that we need to use a value + // less than `advisoryTargetPostShuffleInputSize` as the target size of a coalesced task. val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum // The max at here is to make sure that when we have an empty table, we // only have a single post-shuffle partition. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala index 7f2211063f974..c2c0e84399d6f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala @@ -52,35 +52,29 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA } private def checkEstimation( - rule: ReduceNumShufflePartitions, bytesByPartitionIdArray: Array[Array[Long]], - expectedPartitionStartIndices: Array[Int]): Unit = { + expectedPartitionStartIndices: Seq[Int], + targetSize: Long, + mixNumPartitions: Int = 1): Unit = { val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map { case (bytesByPartitionId, index) => new MapOutputStatistics(index, bytesByPartitionId) } - val estimatedPartitionStartIndices = - rule.estimatePartitionStartIndices(mapOutputStatistics) + val estimatedPartitionStartIndices = ReduceNumShufflePartitions.estimatePartitionStartIndices( + mapOutputStatistics, + targetSize, + mixNumPartitions) assert(estimatedPartitionStartIndices === expectedPartitionStartIndices) } - private def createReduceNumShufflePartitionsRule( - advisoryTargetPostShuffleInputSize: Long, - minNumPostShufflePartitions: Int = 1): ReduceNumShufflePartitions = { - val conf = new SQLConf().copy( - SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE -> advisoryTargetPostShuffleInputSize, - SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS -> minNumPostShufflePartitions) - ReduceNumShufflePartitions(conf) - } - test("test estimatePartitionStartIndices - 1 Exchange") { - val rule = createReduceNumShufflePartitionsRule(100L) + val targetSize = 100 { // All bytes per partition are 0. val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0) val expectedPartitionStartIndices = Array[Int](0) - checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize) } { @@ -88,51 +82,49 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA // 1 post-shuffle partition is needed. val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0) val expectedPartitionStartIndices = Array[Int](0) - checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize) } { // 2 post-shuffle partitions are needed. val bytesByPartitionId = Array[Long](10, 0, 90, 20, 0) val expectedPartitionStartIndices = Array[Int](0, 3) - checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize) } { // There are a few large pre-shuffle partitions. val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4) - checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize) } { // All pre-shuffle partitions are larger than the targeted size. val bytesByPartitionId = Array[Long](100, 110, 100, 110, 110) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4) - checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize) } { // The last pre-shuffle partition is in a single post-shuffle partition. val bytesByPartitionId = Array[Long](30, 30, 0, 40, 110) val expectedPartitionStartIndices = Array[Int](0, 4) - checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices) + checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize) } } test("test estimatePartitionStartIndices - 2 Exchanges") { - val rule = createReduceNumShufflePartitionsRule(100L) + val targetSize = 100 { // If there are multiple values of the number of pre-shuffle partitions, // we should see an assertion error. val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0) val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0, 0) - val mapOutputStatistics = - Array( - new MapOutputStatistics(0, bytesByPartitionId1), - new MapOutputStatistics(1, bytesByPartitionId2)) - intercept[AssertionError](rule.estimatePartitionStartIndices(mapOutputStatistics)) + intercept[AssertionError]{ + checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), Seq.empty, targetSize) + } } { @@ -141,9 +133,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0) val expectedPartitionStartIndices = Array[Int](0) checkEstimation( - rule, Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionStartIndices) + expectedPartitionStartIndices, + targetSize) } { @@ -153,9 +145,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20) val expectedPartitionStartIndices = Array[Int](0) checkEstimation( - rule, Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionStartIndices) + expectedPartitionStartIndices, + targetSize) } { @@ -164,9 +156,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30) val expectedPartitionStartIndices = Array[Int](0, 2, 4) checkEstimation( - rule, Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionStartIndices) + expectedPartitionStartIndices, + targetSize) } { @@ -175,9 +167,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4) checkEstimation( - rule, Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionStartIndices) + expectedPartitionStartIndices, + targetSize) } { @@ -186,9 +178,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4) checkEstimation( - rule, Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionStartIndices) + expectedPartitionStartIndices, + targetSize) } { @@ -197,9 +189,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4) checkEstimation( - rule, Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionStartIndices) + expectedPartitionStartIndices, + targetSize) } { @@ -208,14 +200,15 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val bytesByPartitionId2 = Array[Long](30, 0, 60, 70, 110) val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4) checkEstimation( - rule, Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionStartIndices) + expectedPartitionStartIndices, + targetSize) } } test("test estimatePartitionStartIndices and enforce minimal number of reducers") { - val rule = createReduceNumShufflePartitionsRule(100L, 2) + val targetSize = 100 + val minNumPartitions = 2 { // The minimal number of post-shuffle partitions is not enforced because @@ -224,9 +217,10 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0) val expectedPartitionStartIndices = Array[Int](0) checkEstimation( - rule, Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionStartIndices) + expectedPartitionStartIndices, + targetSize, + minNumPartitions) } { @@ -235,9 +229,10 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val bytesByPartitionId2 = Array[Long](5, 10, 0, 10, 5) val expectedPartitionStartIndices = Array[Int](0, 3) checkEstimation( - rule, Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionStartIndices) + expectedPartitionStartIndices, + targetSize, + minNumPartitions) } { @@ -246,9 +241,10 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val bytesByPartitionId2 = Array[Long](40, 10, 0, 10, 30) val expectedPartitionStartIndices = Array[Int](0, 1, 3, 4) checkEstimation( - rule, Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionStartIndices) + expectedPartitionStartIndices, + targetSize, + minNumPartitions) } } From 95af0ee7c4aea140fd6b6000dada5c27a4e2e13c Mon Sep 17 00:00:00 2001 From: Will Raschkowski Date: Wed, 29 Apr 2020 19:38:36 +0200 Subject: [PATCH 2/6] Make SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS optional and default to defaultParallelism --- .../org/apache/spark/sql/internal/SQLConf.scala | 8 +++----- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../adaptive/ReduceNumShufflePartitions.scala | 12 +++++++++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 64e05d1b29c6c..e7005174291d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -299,11 +299,12 @@ object SQLConf { val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = buildConf("spark.sql.adaptive.minNumPostShufflePartitions") - .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.") + .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution. " + + "If not set, the default value is the default parallelism of the Spark cluster.") .intConf .checkValue(_ > 0, "The minimum shuffle partition number " + "must be a positive integer.") - .createWithDefault(1) + .createOptional val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS = buildConf("spark.sql.adaptive.maxNumPostShufflePartitions") @@ -1830,9 +1831,6 @@ class SQLConf extends Serializable with Logging { def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED) - def minNumPostShufflePartitions: Int = - getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) - def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 83e6b7286b4e7..e0256ac28c917 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -90,7 +90,7 @@ case class AdaptiveSparkPlanExec( // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, subqueryCache), - ReduceNumShufflePartitions(conf), + ReduceNumShufflePartitions(queryExecution.sparkSession), CollapseCodegenStages(conf) ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index f34ce52b1d57f..105f22ec2d8a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -23,6 +23,7 @@ import scala.concurrent.duration.Duration import org.apache.spark.MapOutputStatistics import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} @@ -52,7 +53,8 @@ import org.apache.spark.util.ThreadUtils * - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB) * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB) */ -case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { +case class ReduceNumShufflePartitions(session: SparkSession) extends Rule[SparkPlan] { + private def conf = session.sessionState.conf override def apply(plan: SparkPlan): SparkPlan = { if (!conf.reducePostShufflePartitionsEnabled) { @@ -89,10 +91,14 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { - val partitionStartIndices = estimatePartitionStartIndices( + // We fall back to Spark default parallelism if the minimum number of coalesced partitions + // is not set, so to avoid perf regressions compared to no coalescing. + val minPartitionNum = conf.getConf(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) + .getOrElse(session.sparkContext.defaultParallelism) + val partitionStartIndices = ReduceNumShufflePartitions.estimatePartitionStartIndices( validMetrics.toArray, conf.targetPostShuffleInputSize, - conf.minNumPostShufflePartitions) + minPartitionNum) // This transformation adds new nodes, so we must use `transformUp` here. plan.transformUp { // even for shuffle exchange whose input RDD has 0 partition, we should still update its From fd30d52f84cfd574f94046af37d184b11e5ba708 Mon Sep 17 00:00:00 2001 From: Will Raschkowski Date: Wed, 29 Apr 2020 19:39:01 +0200 Subject: [PATCH 3/6] Test that defaultParallelism is used as minimum --- .../ReduceNumShufflePartitionsSuite.scala | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala index c2c0e84399d6f..d4e8826ce0998 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala @@ -264,7 +264,8 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA def withSparkSession( f: SparkSession => Unit, targetPostShuffleInputSize: Int, - minNumPostShufflePartitions: Option[Int]): Unit = { + minNumPostShufflePartitions: Option[Int], + unsetMinMax: Boolean = false): Unit = { val sparkConf = new SparkConf(false) .setMaster("local[*]") @@ -284,6 +285,11 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1") } + if (unsetMinMax) { + sparkConf.remove(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key) + sparkConf.remove(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key) + } + val spark = SparkSession.builder() .config(sparkConf) .getOrCreate() @@ -519,6 +525,38 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA } } + // TODO(rshkv): Remove after taking SPARK-31124 + test("number of reducers is lower-bound by default parallelism without configured minimum") { + val test = { spark: SparkSession => + val df = + spark + .range(0, 1000, 1, numInputPartitions) + .selectExpr("id % 20 as key", "id as value") + val agg = df.groupBy("key").count() + + agg.collect() + + val finalPlan = agg.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + val shuffleReaders = finalPlan.collect { + case reader: CoalescedShuffleReaderExec => reader + } + + assert(shuffleReaders.length === 1) + shuffleReaders.foreach { reader => + // Assert that there is no configured minimum + assert(!spark.conf.contains(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key)) + // The number of output partitions will be slightly larger than defaultParallelism because + // input partitions don't exactly fit. Key here is that have more than one partition. + assert(reader.outputPartitioning.numPartitions >= spark.sparkContext.defaultParallelism) + } + } + // Pick an advisory partition size such that we'd have one partition + // if min = defaultParallelism didn't work + val targetSizeForOnePartition = 1000000000 + withSparkSession(test, targetSizeForOnePartition, None, unsetMinMax = true) + } + test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") { val test: SparkSession => Unit = { spark: SparkSession => spark.sql("SET spark.sql.exchange.reuse=true") From 1c64f03ba408cadbd31f4ce1719bd4bf9efdc8b7 Mon Sep 17 00:00:00 2001 From: Will Raschkowski Date: Wed, 29 Apr 2020 20:50:17 +0200 Subject: [PATCH 4/6] Fix 'Change merge join to broadcast join and reduce number of shuffle partitions' --- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 7b76b36b0a216..aa986a18fb2a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -98,6 +98,7 @@ class AdaptiveQueryExecSuite SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", + SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key -> "1", SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "150") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData join testData2 ON key = a where value = '1'") From 6a1a54332ed75ee3cdc7324b3ed24f0a820c6f72 Mon Sep 17 00:00:00 2001 From: Will Raschkowski Date: Thu, 30 Apr 2020 14:30:36 +0200 Subject: [PATCH 5/6] Fix the mix --- .../spark/sql/execution/ReduceNumShufflePartitionsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala index d4e8826ce0998..a6cc6b6d151b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala @@ -55,7 +55,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA bytesByPartitionIdArray: Array[Array[Long]], expectedPartitionStartIndices: Seq[Int], targetSize: Long, - mixNumPartitions: Int = 1): Unit = { + minNumPartitions: Int = 1): Unit = { val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map { case (bytesByPartitionId, index) => new MapOutputStatistics(index, bytesByPartitionId) @@ -63,7 +63,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val estimatedPartitionStartIndices = ReduceNumShufflePartitions.estimatePartitionStartIndices( mapOutputStatistics, targetSize, - mixNumPartitions) + minNumPartitions) assert(estimatedPartitionStartIndices === expectedPartitionStartIndices) } From b8166bd4969ea68347157df2f07e411499bef01e Mon Sep 17 00:00:00 2001 From: Will Raschkowski Date: Thu, 30 Apr 2020 17:53:04 +0200 Subject: [PATCH 6/6] Add issue reference for revert todo --- .../spark/sql/execution/ReduceNumShufflePartitionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala index a6cc6b6d151b5..01d618edc4978 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala @@ -525,7 +525,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA } } - // TODO(rshkv): Remove after taking SPARK-31124 + // TODO(rshkv): Remove after taking SPARK-31124 (#676) test("number of reducers is lower-bound by default parallelism without configured minimum") { val test = { spark: SparkSession => val df =