-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29655][SQL] Read bucketed tables obeys spark.sql.shuffle.partitions #26409
Conversation
Test build #113307 has finished for PR 26409 at commit
|
Test build #113316 has finished for PR 26409 at commit
|
cc @cloud-fan |
can you give a step-by-step explanation of how this happens? I can't get it from either PR description or the code changes. |
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Outdated
Show resolved
Hide resolved
This is is not an AQE problem. IIUC we will add extra shuffle when Can we make |
And this is not a trivial problem. Image that a table has only 1 bucket, it may be faster if we add an extra shuffle to increase the parallelism. |
let's think about the expected behavior. This is truly a cost problem but we should figure out a simple rule as estimating cost is not realistic in Spark for now.
Image we join a bucketed table with a big table. We can avoid shuffle if the number of buckets is reasonable as the number of partitions to shuffle the big table. It's hard to define reasonable here and I think it's OK to take the value of |
// maxNumPostShufflePartitions is usually larger than numShufflePartitions, | ||
// which causes some bucket map join lose efficacy after enabling adaptive execution. | ||
// Please see SPARK-29655 for more details. | ||
val expectedChildrenNumPartitions = if (conf.adaptiveExecutionEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logical is convoluted here as we've already added the shuffles with maxNumPostShufflePartitions
, and we need to revert it.
Can we make the implementation clearer? Basically we are picking the targetNumPartitions
as:
- if there are no non-shuffle children, keep the previous behavior.
- for non-shuffle children, get the max num partitions among them.
2.1. if the max num partitions is larger thanconf.numShufflePartitions
, pick it astargetNumPartitions
.
2.2. otherwise, pickconf.numShufflePartitions
astargetNumPartitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This implementation could make it clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of another implementation: 45109c7? It may be more reasonable. But personally, I do not like add new conf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's indeed clearer to have a dedicated config, but I'd like to have a better solution that can make decisions automatically instead of relying on configs. We can think about it later.
val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max) | ||
val nonShuffleChildrenNumPartitions = | ||
childrenIndexes.filterNot(children(_).isInstanceOf[ShuffleExchangeExec]) | ||
.map(children(_).outputPartitioning.numPartitions).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])...
@@ -795,4 +804,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { | |||
} | |||
} | |||
|
|||
test("Read bucketed tables obeys numShufflePartitions") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add the JIRA ID in the test name as this is a regression
Test build #113634 has started for PR 26409 at commit |
val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max) | ||
val nonShuffleChildrenNumPartitions = | ||
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) | ||
.map(_.outputPartitioning.numPartitions).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: in practice there will be 2 children at most, toSet
is not really needed.
Test build #113638 has started for PR 26409 at commit |
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Show resolved
Hide resolved
Test build #113670 has finished for PR 26409 at commit
|
retest this please |
Test build #113679 has finished for PR 26409 at commit
|
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) | ||
.map(_.outputPartitioning.numPartitions) | ||
val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty && | ||
conf.maxNumPostShufflePartitions > conf.numShufflePartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @viirya I add conf.maxNumPostShufflePartitions > conf.numShufflePartitions
to fix test error:
org.apache.spark.sql.execution.ReduceNumShufflePartitionsSuite.determining the number of reducers: plan already partitioned(minNumPostShufflePartitions: 5)
org.apache.spark.sql.execution.ReduceNumShufflePartitionsSuite.determining the number of reducers: plan already partitioned
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/113679/testReport/
Test build #113701 has finished for PR 26409 at commit
|
Test build #113768 has finished for PR 26409 at commit
|
val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max) | ||
// Read bucketed tables always obeys numShufflePartitions because maxNumPostShufflePartitions | ||
// is usually much larger than numShufflePartitions, | ||
// which causes some bucket map join lose efficacy after enabling adaptive execution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is hard to understand. How about
If there are non-shuffle children that satisfy the required distribution, we have some tradeoffs
when picking the expected number of shuffle partitions:
1. we should avoid shuffling these children
2. we should have a reasonable parallelism
Here we pick the max number of partitions among these non-shuffle children as the expected number
of shuffle partitions. However, if it's smaller than `conf.numShufflePartitions`, we pick
`conf.numShufflePartitions` as the expected number of shuffle partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #113795 has finished for PR 26409 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
In order to avoid frequently changing the value of
spark.sql.adaptive.shuffle.maxNumPostShufflePartitions
, we usually setspark.sql.adaptive.shuffle.maxNumPostShufflePartitions
much larger thanspark.sql.shuffle.partitions
after enabling adaptive execution, which causes some bucket map join lose efficacy and add moreShuffleExchange
.How to reproduce:
vs
This PR makes read bucketed tables always obeys
spark.sql.shuffle.partitions
even enabling adaptive execution and setspark.sql.adaptive.shuffle.maxNumPostShufflePartitions
to avoid add moreShuffleExchange
.Why are the changes needed?
Do not degrade performance after enabling adaptive execution.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test.