From 16e8f5b13659295ce4662099f109bff0d86bf638 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 6 Nov 2019 16:08:31 +0800 Subject: [PATCH 01/10] Enable adaptive execution should not add ShuffleExchange --- .../exchange/EnsureRequirements.scala | 19 ++++++++++- .../adaptive/AdaptiveQueryExecSuite.scala | 32 +++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index c56a5c015f32d..ad5be3fe35be7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -83,7 +83,24 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { numPartitionsSet.headOption } - val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max) + // maxNumPostShufflePartitions is usually larger than numShufflePartitions, + // which causes some bucket map join lose efficacy after enabling adaptive execution. + // Please see SPARK-29655 for more details. + val selectedChildrenNumPartitions = if (conf.adaptiveExecutionEnabled) { + val withoutShuffleChildrenNumPartitions = + childrenIndexes.filterNot(children(_).isInstanceOf[ShuffleExchangeExec]) + .map(children(_).outputPartitioning.numPartitions).toSet + if (withoutShuffleChildrenNumPartitions.nonEmpty) { + math.min(math.max(withoutShuffleChildrenNumPartitions.max, conf.numShufflePartitions), + conf.maxNumPostShufflePartitions) + } else { + childrenNumPartitions.max + } + } else { + childrenNumPartitions.max + } + + val targetNumPartitions = requiredNumPartitions.getOrElse(selectedChildrenNumPartitions) children = children.zip(requiredChildDistributions).zipWithIndex.map { case ((child, distribution), index) if childrenIndexes.contains(index) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index b140b08950db4..eff38012fc618 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.execution.adaptive -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{DataFrame, QueryTest} import org.apache.spark.sql.execution.{ReusedSubqueryExec, SparkPlan} -import org.apache.spark.sql.execution.exchange.Exchange +import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -474,4 +474,32 @@ class AdaptiveQueryExecSuite } } } + + test("Enable adaptive execution should not add ShuffleExchange") { + def findTopLevelShuffleExchangeExec(df: DataFrame): Seq[ShuffleExchangeExec] = { + collect(df.queryExecution.executedPlan) { + case s: ShuffleExchangeExec => s + } + } + + val bucketedTableName = "bucketed_table" + withTable(bucketedTableName) { + withSQLConf( + SQLConf.SHUFFLE_PARTITIONS.key -> "4", + SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key -> "5", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + spark.range(10).write.bucketBy(4, "id").sortBy("id").saveAsTable(bucketedTableName) + val bucketedTable = spark.table(bucketedTableName) + + Seq(false, true).foreach { isAdaptive => + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$isAdaptive") { + assert( + findTopLevelShuffleExchangeExec(bucketedTable.join(spark.range(8), "id")).size === 1) + assert( + findTopLevelShuffleExchangeExec(bucketedTable.join(bucketedTable, "id")).size === 0) + } + } + } + } + } } From 1ba9edf459416e5b1805833d059980f13bf44bea Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 6 Nov 2019 20:03:15 +0800 Subject: [PATCH 02/10] Fix test error --- .../spark/sql/execution/exchange/EnsureRequirements.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index ad5be3fe35be7..6f7929a2e74c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -86,11 +86,12 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { // maxNumPostShufflePartitions is usually larger than numShufflePartitions, // which causes some bucket map join lose efficacy after enabling adaptive execution. // Please see SPARK-29655 for more details. - val selectedChildrenNumPartitions = if (conf.adaptiveExecutionEnabled) { + val expectedChildrenNumPartitions = if (conf.adaptiveExecutionEnabled) { val withoutShuffleChildrenNumPartitions = childrenIndexes.filterNot(children(_).isInstanceOf[ShuffleExchangeExec]) .map(children(_).outputPartitioning.numPartitions).toSet - if (withoutShuffleChildrenNumPartitions.nonEmpty) { + if (withoutShuffleChildrenNumPartitions.nonEmpty && + conf.maxNumPostShufflePartitions > conf.numShufflePartitions) { math.min(math.max(withoutShuffleChildrenNumPartitions.max, conf.numShufflePartitions), conf.maxNumPostShufflePartitions) } else { @@ -100,7 +101,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { childrenNumPartitions.max } - val targetNumPartitions = requiredNumPartitions.getOrElse(selectedChildrenNumPartitions) + val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions) children = children.zip(requiredChildDistributions).zipWithIndex.map { case ((child, distribution), index) if childrenIndexes.contains(index) => From 45109c71c3efc31f0d5aba6588a811faeea2a8f2 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 12 Nov 2019 18:32:36 +0800 Subject: [PATCH 03/10] Ignores Thrift server ThriftServerPageSuite --- .../apache/spark/sql/internal/SQLConf.scala | 10 +++++ .../exchange/EnsureRequirements.scala | 23 +++++------ .../spark/sql/sources/BucketedReadSuite.scala | 39 +++++++++++++++++++ 3 files changed, 59 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a228d9f064a1e..9d37f75badb29 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -349,6 +349,14 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive") .createWithDefault(200) + val SHUFFLE_WITHOUT_SHUFFLE_SIDE_RATIO = + buildConf("spark.sql.shuffle.withoutShuffleSideRatio") + .doc("The maximum number of without shuffle partition ratio lower than this config " + + "will not add shuffle exchange for it.") + .doubleConf + .checkValue(ratio => ratio > 0 && ratio <= 1, "The ratio value must be in [0, 1].") + .createWithDefault(1.0) + val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") .doc("When true, enable adaptive query execution.") .booleanConf @@ -2162,6 +2170,8 @@ class SQLConf extends Serializable with Logging { def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) + def withoutShuffleSideRatio: Double = getConf(SHUFFLE_WITHOUT_SHUFFLE_SIDE_RATIO) + def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) def targetPostShuffleInputSize: Long = getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 6f7929a2e74c2..3aeaf6d74b168 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -83,22 +83,19 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { numPartitionsSet.headOption } - // maxNumPostShufflePartitions is usually larger than numShufflePartitions, - // which causes some bucket map join lose efficacy after enabling adaptive execution. - // Please see SPARK-29655 for more details. - val expectedChildrenNumPartitions = if (conf.adaptiveExecutionEnabled) { - val withoutShuffleChildrenNumPartitions = - childrenIndexes.filterNot(children(_).isInstanceOf[ShuffleExchangeExec]) - .map(children(_).outputPartitioning.numPartitions).toSet - if (withoutShuffleChildrenNumPartitions.nonEmpty && - conf.maxNumPostShufflePartitions > conf.numShufflePartitions) { - math.min(math.max(withoutShuffleChildrenNumPartitions.max, conf.numShufflePartitions), - conf.maxNumPostShufflePartitions) + val maxNumPartition = childrenNumPartitions.max + val withoutShuffleChildrenNumPartitions = + childrenIndexes.filterNot(children(_).isInstanceOf[ShuffleExchangeExec]) + .map(children(_).outputPartitioning.numPartitions).toSet + val expectedChildrenNumPartitions = if (withoutShuffleChildrenNumPartitions.nonEmpty) { + val withoutShuffleMaxNumPartition = withoutShuffleChildrenNumPartitions.max + if (withoutShuffleMaxNumPartition * 1.0 / maxNumPartition >= conf.withoutShuffleSideRatio) { + withoutShuffleMaxNumPartition } else { - childrenNumPartitions.max + maxNumPartition } } else { - childrenNumPartitions.max + maxNumPartition } val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 7043b6d396977..667dad640df4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -795,4 +795,43 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } + test("Support spark.sql.shuffle.withoutShuffleSideRatio") { + // numBuckets >= spark.sql.shuffle.partitions + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") { + val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + + // numBuckets < spark.sql.shuffle.partitions + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") { + val bucketSpec = Some(BucketSpec(4, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + + // numBuckets < spark.sql.shuffle.partitions and withoutShuffleSideRatio = 0.1 + withSQLConf( + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.SHUFFLE_WITHOUT_SHUFFLE_SIDE_RATIO.key -> "0.1") { + val bucketSpec = Some(BucketSpec(4, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + } } From 73a4943cc21f5fc3a225ce36ebe1d51328abdee4 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 12 Nov 2019 23:03:23 +0800 Subject: [PATCH 04/10] Read bucketed tables obeys numShufflePartitions --- .../apache/spark/sql/internal/SQLConf.scala | 10 --- .../exchange/EnsureRequirements.scala | 14 ++--- .../adaptive/AdaptiveQueryExecSuite.scala | 32 +--------- .../spark/sql/sources/BucketedReadSuite.scala | 62 ++++++++----------- 4 files changed, 31 insertions(+), 87 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9d37f75badb29..a228d9f064a1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -349,14 +349,6 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive") .createWithDefault(200) - val SHUFFLE_WITHOUT_SHUFFLE_SIDE_RATIO = - buildConf("spark.sql.shuffle.withoutShuffleSideRatio") - .doc("The maximum number of without shuffle partition ratio lower than this config " + - "will not add shuffle exchange for it.") - .doubleConf - .checkValue(ratio => ratio > 0 && ratio <= 1, "The ratio value must be in [0, 1].") - .createWithDefault(1.0) - val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") .doc("When true, enable adaptive query execution.") .booleanConf @@ -2170,8 +2162,6 @@ class SQLConf extends Serializable with Logging { def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) - def withoutShuffleSideRatio: Double = getConf(SHUFFLE_WITHOUT_SHUFFLE_SIDE_RATIO) - def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) def targetPostShuffleInputSize: Long = getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 3aeaf6d74b168..f79e08b95025f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -83,19 +83,13 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { numPartitionsSet.headOption } - val maxNumPartition = childrenNumPartitions.max - val withoutShuffleChildrenNumPartitions = + val nonShuffleChildrenNumPartitions = childrenIndexes.filterNot(children(_).isInstanceOf[ShuffleExchangeExec]) .map(children(_).outputPartitioning.numPartitions).toSet - val expectedChildrenNumPartitions = if (withoutShuffleChildrenNumPartitions.nonEmpty) { - val withoutShuffleMaxNumPartition = withoutShuffleChildrenNumPartitions.max - if (withoutShuffleMaxNumPartition * 1.0 / maxNumPartition >= conf.withoutShuffleSideRatio) { - withoutShuffleMaxNumPartition - } else { - maxNumPartition - } + val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { + math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions) } else { - maxNumPartition + childrenNumPartitions.max } val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index eff38012fc618..b140b08950db4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.execution.adaptive -import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.execution.{ReusedSubqueryExec, SparkPlan} -import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -474,32 +474,4 @@ class AdaptiveQueryExecSuite } } } - - test("Enable adaptive execution should not add ShuffleExchange") { - def findTopLevelShuffleExchangeExec(df: DataFrame): Seq[ShuffleExchangeExec] = { - collect(df.queryExecution.executedPlan) { - case s: ShuffleExchangeExec => s - } - } - - val bucketedTableName = "bucketed_table" - withTable(bucketedTableName) { - withSQLConf( - SQLConf.SHUFFLE_PARTITIONS.key -> "4", - SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key -> "5", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - spark.range(10).write.bucketBy(4, "id").sortBy("id").saveAsTable(bucketedTableName) - val bucketedTable = spark.table(bucketedTableName) - - Seq(false, true).foreach { isAdaptive => - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$isAdaptive") { - assert( - findTopLevelShuffleExchangeExec(bucketedTable.join(spark.range(8), "id")).size === 1) - assert( - findTopLevelShuffleExchangeExec(bucketedTable.join(bucketedTable, "id")).size === 0) - } - } - } - } - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 667dad640df4e..a09f546e72a90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec @@ -382,8 +383,16 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { joined.sort("bucketed_table1.k", "bucketed_table2.k"), df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k")) - assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec]) - val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec] + val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) { + val executedPlan = + joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + assert(executedPlan.isInstanceOf[SortMergeJoinExec]) + executedPlan.asInstanceOf[SortMergeJoinExec] + } else { + val executedPlan = joined.queryExecution.executedPlan + assert(executedPlan.isInstanceOf[SortMergeJoinExec]) + executedPlan.asInstanceOf[SortMergeJoinExec] + } // check existence of shuffle assert( @@ -795,43 +804,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - test("Support spark.sql.shuffle.withoutShuffleSideRatio") { - // numBuckets >= spark.sql.shuffle.partitions - withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") { - val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil)) - val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j")) - ) - } - - // numBuckets < spark.sql.shuffle.partitions - withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") { - val bucketSpec = Some(BucketSpec(4, Seq("i", "j"), Nil)) - val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) - val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j")) - ) - } - - // numBuckets < spark.sql.shuffle.partitions and withoutShuffleSideRatio = 0.1 + test("Read bucketed tables obeys numShufflePartitions") { withSQLConf( SQLConf.SHUFFLE_PARTITIONS.key -> "5", - SQLConf.SHUFFLE_WITHOUT_SHUFFLE_SIDE_RATIO.key -> "0.1") { - val bucketSpec = Some(BucketSpec(4, Seq("i", "j"), Nil)) - val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j")) - ) + SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key -> "7") { + val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil)) + Seq(false, true).foreach { enableAdaptive => + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$enableAdaptive") { + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + } } } } From 59afeab420edf5bf01654279ce242a443b7c6668 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 13 Nov 2019 00:30:04 +0800 Subject: [PATCH 05/10] Address comment --- .../spark/sql/execution/exchange/EnsureRequirements.scala | 4 ++-- .../org/apache/spark/sql/sources/BucketedReadSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index f79e08b95025f..83eab1afb0ade 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -84,8 +84,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { } val nonShuffleChildrenNumPartitions = - childrenIndexes.filterNot(children(_).isInstanceOf[ShuffleExchangeExec]) - .map(children(_).outputPartitioning.numPartitions).toSet + childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) + .map(_.outputPartitioning.numPartitions).toSet val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index a09f546e72a90..a585f215ad681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -804,7 +804,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - test("Read bucketed tables obeys numShufflePartitions") { + test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") { withSQLConf( SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key -> "7") { From 8ec1518cf25fa5005f41d19de13e16f786bac45a Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 13 Nov 2019 00:46:47 +0800 Subject: [PATCH 06/10] Remove toSet --- .../spark/sql/execution/exchange/EnsureRequirements.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 83eab1afb0ade..5ca6e0f9783bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -85,7 +85,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { val nonShuffleChildrenNumPartitions = childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) - .map(_.outputPartitioning.numPartitions).toSet + .map(_.outputPartitioning.numPartitions) val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions) } else { From a50122b0acfe36cd5e397d5b03a58acdc7c05dd3 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 13 Nov 2019 13:53:30 +0800 Subject: [PATCH 07/10] Add comment --- .../spark/sql/execution/exchange/EnsureRequirements.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 5ca6e0f9783bf..6a4e62d15f8f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -83,6 +83,9 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { numPartitionsSet.headOption } + // Read bucketed tables always obeys numShufflePartitions because maxNumPostShufflePartitions + // is usually much larger than numShufflePartitions, + // which causes some bucket map join lose efficacy after enabling adaptive execution. val nonShuffleChildrenNumPartitions = childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) .map(_.outputPartitioning.numPartitions) From 57c50b85de6a23babe11f802b44f62c07df4d14d Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 13 Nov 2019 19:37:15 +0800 Subject: [PATCH 08/10] Fix test error: org.apache.spark.sql.execution.ReduceNumShufflePartitionsSuite --- .../spark/sql/execution/exchange/EnsureRequirements.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 6a4e62d15f8f7..c1c837e2173cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -89,7 +89,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { val nonShuffleChildrenNumPartitions = childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) .map(_.outputPartitioning.numPartitions) - val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { + val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty && + conf.maxNumPostShufflePartitions > conf.numShufflePartitions) { math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions) } else { childrenNumPartitions.max From a4f76113b7c7d1ac692ec504f4f624c1f25f828f Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 14 Nov 2019 16:43:42 +0800 Subject: [PATCH 09/10] Fix test error --- .../spark/sql/execution/exchange/EnsureRequirements.scala | 3 +-- .../spark/sql/execution/ReduceNumShufflePartitionsSuite.scala | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index c1c837e2173cd..6a4e62d15f8f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -89,8 +89,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { val nonShuffleChildrenNumPartitions = childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) .map(_.outputPartitioning.numPartitions) - val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty && - conf.maxNumPostShufflePartitions > conf.numShufflePartitions) { + val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions) } else { childrenNumPartitions.max diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala index 4d408cd8ebd70..21ec1ac9bda08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala @@ -274,6 +274,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA .setMaster("local[*]") .setAppName("test") .set(UI_ENABLED, false) + .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5") .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") @@ -507,7 +508,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA join, expectedAnswer.collect()) - // Then, let's make sure we do not reduce number of ppst shuffle partitions. + // Then, let's make sure we do not reduce number of post shuffle partitions. val finalPlan = join.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan val shuffleReaders = finalPlan.collect { From eb4f65fcea31f269f3addaa00a622f2459c5064d Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 14 Nov 2019 23:48:39 +0800 Subject: [PATCH 10/10] Update comment --- .../sql/execution/exchange/EnsureRequirements.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 6a4e62d15f8f7..866b382a1d808 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -83,13 +83,18 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { numPartitionsSet.headOption } - // Read bucketed tables always obeys numShufflePartitions because maxNumPostShufflePartitions - // is usually much larger than numShufflePartitions, - // which causes some bucket map join lose efficacy after enabling adaptive execution. + // If there are non-shuffle children that satisfy the required distribution, we have + // some tradeoffs when picking the expected number of shuffle partitions: + // 1. We should avoid shuffling these children. + // 2. We should have a reasonable parallelism. val nonShuffleChildrenNumPartitions = childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) .map(_.outputPartitioning.numPartitions) val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { + // Here we pick the max number of partitions among these non-shuffle children as the + // expected number of shuffle partitions. However, if it's smaller than + // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the + // expected number of shuffle partitions. math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions) } else { childrenNumPartitions.max