Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29655][SQL] Read bucketed tables obeys spark.sql.shuffle.partitions #26409

Closed
wants to merge 10 commits into from
Closed

[SPARK-29655][SQL] Read bucketed tables obeys spark.sql.shuffle.partitions #26409

wants to merge 10 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Nov 6, 2019

What changes were proposed in this pull request?

In order to avoid frequently changing the value of spark.sql.adaptive.shuffle.maxNumPostShufflePartitions, we usually set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions much larger than spark.sql.shuffle.partitions after enabling adaptive execution, which causes some bucket map join lose efficacy and add more ShuffleExchange.

How to reproduce:

val bucketedTableName = "bucketed_table"
spark.range(10000).write.bucketBy(500, "id").sortBy("id").mode(org.apache.spark.sql.SaveMode.Overwrite).saveAsTable(bucketedTableName)
val bucketedTable = spark.table(bucketedTableName)
val df = spark.range(8)

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
// Spark 2.4. spark.sql.adaptive.enabled=false
// We set spark.sql.shuffle.partitions <= 500 every time based on our data in this case.
spark.conf.set("spark.sql.shuffle.partitions", 500)
bucketedTable.join(df, "id").explain()
// Since 3.0. We enabled adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to a larger values to fit more cases.
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 1000)
bucketedTable.join(df, "id").explain()
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
*(4) Project [id#5L]
+- *(4) SortMergeJoin [id#5L], [id#7L], Inner
   :- *(1) Sort [id#5L ASC NULLS FIRST], false, 0
   :  +- *(1) Project [id#5L]
   :     +- *(1) Filter isnotnull(id#5L)
   :        +- *(1) ColumnarToRow
   :           +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
   +- *(3) Sort [id#7L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#7L, 500), true, [id=#49]
         +- *(2) Range (0, 8, step=1, splits=16)

vs

scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Project [id#5L]
   +- SortMergeJoin [id#5L], [id#7L], Inner
      :- Sort [id#5L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#5L, 1000), true, [id=#93]
      :     +- Project [id#5L]
      :        +- Filter isnotnull(id#5L)
      :           +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
      +- Sort [id#7L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#7L, 1000), true, [id=#92]
            +- Range (0, 8, step=1, splits=16)

This PR makes read bucketed tables always obeys spark.sql.shuffle.partitions even enabling adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to avoid add more ShuffleExchange.

Why are the changes needed?

Do not degrade performance after enabling adaptive execution.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@SparkQA
Copy link

SparkQA commented Nov 6, 2019

Test build #113307 has finished for PR 26409 at commit 16e8f5b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 6, 2019

Test build #113316 has finished for PR 26409 at commit 1ba9edf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Nov 6, 2019

cc @cloud-fan

@cloud-fan
Copy link
Contributor

can you give a step-by-step explanation of how this happens? I can't get it from either PR description or the code changes.

@cloud-fan
Copy link
Contributor

This is is not an AQE problem. IIUC we will add extra shuffle when spark.sql.shuffle.partitions is larger than the num buckets.

Can we make EnsureRequirements smarter and fix the underlying problem?

@cloud-fan
Copy link
Contributor

And this is not a trivial problem. Image that a table has only 1 bucket, it may be faster if we add an extra shuffle to increase the parallelism.

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 11, 2019

let's think about the expected behavior. This is truly a cost problem but we should figure out a simple rule as estimating cost is not realistic in Spark for now.

  1. we should not blindly avoid shuffle. very few buckets can lead to poor performance as parallelism is low.
  2. we should avoid shuffle if the number of buckets is reasonable.

Image we join a bucketed table with a big table. We can avoid shuffle if the number of buckets is reasonable as the number of partitions to shuffle the big table. It's hard to define reasonable here and I think it's OK to take the value of spark.sql.shuffle.partitions as the reasonable number.

// maxNumPostShufflePartitions is usually larger than numShufflePartitions,
// which causes some bucket map join lose efficacy after enabling adaptive execution.
// Please see SPARK-29655 for more details.
val expectedChildrenNumPartitions = if (conf.adaptiveExecutionEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logical is convoluted here as we've already added the shuffles with maxNumPostShufflePartitions, and we need to revert it.

Can we make the implementation clearer? Basically we are picking the targetNumPartitions as:

  1. if there are no non-shuffle children, keep the previous behavior.
  2. for non-shuffle children, get the max num partitions among them.
    2.1. if the max num partitions is larger than conf.numShufflePartitions, pick it as targetNumPartitions.
    2.2. otherwise, pick conf.numShufflePartitions as targetNumPartitions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This implementation could make it clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of another implementation: 45109c7? It may be more reasonable. But personally, I do not like add new conf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's indeed clearer to have a dedicated config, but I'd like to have a better solution that can make decisions automatically instead of relying on configs. We can think about it later.

@wangyum wangyum changed the title [SPARK-29655][SQL] Enable adaptive execution should not add more ShuffleExchange [SPARK-29655][SQL] Read bucketed tables obeys spark.sql.shuffle.partitions Nov 12, 2019
val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max)
val nonShuffleChildrenNumPartitions =
childrenIndexes.filterNot(children(_).isInstanceOf[ShuffleExchangeExec])
.map(children(_).outputPartitioning.numPartitions).toSet
Copy link
Contributor

@cloud-fan cloud-fan Nov 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])...

@@ -795,4 +804,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
}
}

test("Read bucketed tables obeys numShufflePartitions") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the JIRA ID in the test name as this is a regression

@SparkQA
Copy link

SparkQA commented Nov 12, 2019

Test build #113634 has started for PR 26409 at commit 73a4943.

val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max)
val nonShuffleChildrenNumPartitions =
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
.map(_.outputPartitioning.numPartitions).toSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in practice there will be 2 children at most, toSet is not really needed.

@SparkQA
Copy link

SparkQA commented Nov 12, 2019

Test build #113638 has started for PR 26409 at commit 8ec1518.

@SparkQA
Copy link

SparkQA commented Nov 13, 2019

Test build #113670 has finished for PR 26409 at commit a50122b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Nov 13, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Nov 13, 2019

Test build #113679 has finished for PR 26409 at commit a50122b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
.map(_.outputPartitioning.numPartitions)
val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty &&
conf.maxNumPostShufflePartitions > conf.numShufflePartitions) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @viirya I add conf.maxNumPostShufflePartitions > conf.numShufflePartitions to fix test error:

org.apache.spark.sql.execution.ReduceNumShufflePartitionsSuite.determining the number of reducers: plan already partitioned(minNumPostShufflePartitions: 5)
org.apache.spark.sql.execution.ReduceNumShufflePartitionsSuite.determining the number of reducers: plan already partitioned

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/113679/testReport/

@SparkQA
Copy link

SparkQA commented Nov 13, 2019

Test build #113701 has finished for PR 26409 at commit 57c50b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 14, 2019

Test build #113768 has finished for PR 26409 at commit a4f7611.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max)
// Read bucketed tables always obeys numShufflePartitions because maxNumPostShufflePartitions
// is usually much larger than numShufflePartitions,
// which causes some bucket map join lose efficacy after enabling adaptive execution.
Copy link
Contributor

@cloud-fan cloud-fan Nov 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is hard to understand. How about

If there are non-shuffle children that satisfy the required distribution, we have some tradeoffs
when picking the expected number of shuffle partitions:
1. we should avoid shuffling these children
2. we should have a reasonable parallelism

Here we pick the max number of partitions among these non-shuffle children as the expected number
of shuffle partitions. However, if it's smaller than `conf.numShufflePartitions`, we pick 
`conf.numShufflePartitions` as the expected number of shuffle partitions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@SparkQA
Copy link

SparkQA commented Nov 14, 2019

Test build #113795 has finished for PR 26409 at commit eb4f65f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 4f10e54 Nov 15, 2019
@wangyum wangyum deleted the SPARK-29655 branch November 15, 2019 08:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants