-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22465][Core] Add a safety-check to RDD defaultPartitioner #20002
Conversation
that ignores existing Partitioners, if they are more than a single order of magnitude smaller than the max number of upstream partitions
SparkR test failure seems unrelated to this PR. Any ideas what's wrong? |
Hi @HyukjinKwon , can you please help me with these SparkR test failures? They seem unrelated to me. |
cc: @tgravescs @codlife Could you please review this PR? |
Yup, AppVeyor test seems unrelated. I took a quick look and it seems AppVeyor test failure is related with the latest |
It was fixed in #20003. Rebasing should make the test passed. |
Thank you, @HyukjinKwon . The tests passed after rebasing. |
@tgravescs , could you please take a look when you have some time ? |
Jenkins, test this please |
Test build #85192 has finished for PR 20002 at commit
|
ok to test |
@sujithjay thanks for working on this. I will review but I'm not sure I will get to it for a bit, I'm out for the holidays and not sure I can give this the time it needs for a full review today. |
@tgravescs Thank you for keeping me informed. I look forward to receiving your review. Happy holidays! |
@@ -57,7 +60,8 @@ object Partitioner { | |||
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { | |||
val rdds = (Seq(rdd) ++ others) | |||
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) | |||
if (hasPartitioner.nonEmpty) { | |||
if (hasPartitioner.nonEmpty | |||
&& isEligiblePartitioner(hasPartitioner.maxBy(_.partitions.length), rdds)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hasPartitioner.maxBy(_.partitions.length)
is used repeatedly, pull that into a variable ?
*/ | ||
private def isEligiblePartitioner(hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = { | ||
val maxPartitions = rdds.map(_.partitions.length).max | ||
log10(maxPartitions).floor - log10(hasMaxPartitioner.getNumPartitions).floor < 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why .floor
?
It causes unnecessary discontinuity imo, for example: (9, 11) will not satisfy - but it should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mridulm , I suppose I was trying to ensure a strict order-of-magnitude check; but, I agree it leads to a discontinuity. I will change this, and the corresponding test cases.
Test build #85200 has finished for PR 20002 at commit
|
Test build #85342 has finished for PR 20002 at commit
|
Test build #85344 has finished for PR 20002 at commit
|
Test build #85346 has finished for PR 20002 at commit
|
Scala style tests are failing on a file 'SparkHiveExample.scala' , which is unrelated to this PR. Will rebase to master and try again. |
@sujithjay, I opened a hotfix. It should be fine soon (maybe after few hours). |
Thank you, @HyukjinKwon . I will try again after the hotfix is merged to master. |
Test build #85348 has finished for PR 20002 at commit
|
@@ -21,6 +21,8 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream} | |||
|
|||
import scala.collection.mutable | |||
import scala.collection.mutable.ArrayBuffer | |||
import scala.language.existentials |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, why was this required ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this import, there was a compiler warning:
Warning:(66, 29) inferred existential type Option[org.apache.spark.rdd.RDD[_$2]]( forSome { type _$2 }), which cannot be expressed by wildcards, should be enabled
by making the implicit value scala.language.existentials visible.
This can be achieved by adding the import clause 'import scala.language.existentials'
or by setting the compiler option -language:existentials.
See the Scaladoc for value scala.language.existentials for a discussion
why the feature should be explicitly enabled.
The build on Jenkins failed because of this warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we explicitly set the type, is it still required ? For example, with val hasMaxPartitioner: Option[RDD[_]] = ...
?
Test build #85349 has finished for PR 20002 at commit
|
assert(partitioner1.numPartitions == rdd1.getNumPartitions) | ||
assert(partitioner2.numPartitions == rdd3.getNumPartitions) | ||
assert(partitioner3.numPartitions == rdd3.getNumPartitions) | ||
assert(partitioner4.numPartitions == rdd3.getNumPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a testcase such that numPartitions 9 vs 11 is not treated as an order of magnitude jump (to prevent future changes which end up breaking this).
I left a couple of comments @sujithjay, overall it is looking good, thanks for working on it ! |
…xplicitly mention type of existential type
Thank you, @mridulm for reviewing this PR. I have addressed the latest review comments. |
Test build #85354 has finished for PR 20002 at commit
|
retest this please |
Looks good @sujithjay ... once we have a successful build, I will merge it in. |
Test build #85357 has finished for PR 20002 at commit
|
The failed unit test (at HistoryServerSuite.scala:350) seems unrelated to this PR. |
retest this please |
Test build #85360 has finished for PR 20002 at commit
|
Merged, thanks for fixing this @sujithjay ! |
…rtitioner when defaultParallelism is set ## What changes were proposed in this pull request? apache#20002 purposed a way to safe check the default partitioner, however, if `spark.default.parallelism` is set, the defaultParallelism still could be smaller than the proper number of partitions for upstreams RDDs. This PR tries to extend the approach to address the condition when `spark.default.parallelism` is set. The requirements where the PR helps with are : - Max partitioner is not eligible since it is atleast an order smaller, and - User has explicitly set 'spark.default.parallelism', and - Value of 'spark.default.parallelism' is lower than max partitioner - Since max partitioner was discarded due to being at least an order smaller, default parallelism is worse - even though user specified. Under the rest cases, the changes should be no-op. ## How was this patch tested? Add corresponding test cases in `PairRDDFunctionsSuite` and `PartitioningSuite`. Author: Xingbo Jiang <[email protected]> Closes apache#20091 from jiangxb1987/partitioner.
…rtitioner when defaultParallelism is set ## What changes were proposed in this pull request? #20002 purposed a way to safe check the default partitioner, however, if `spark.default.parallelism` is set, the defaultParallelism still could be smaller than the proper number of partitions for upstreams RDDs. This PR tries to extend the approach to address the condition when `spark.default.parallelism` is set. The requirements where the PR helps with are : - Max partitioner is not eligible since it is atleast an order smaller, and - User has explicitly set 'spark.default.parallelism', and - Value of 'spark.default.parallelism' is lower than max partitioner - Since max partitioner was discarded due to being at least an order smaller, default parallelism is worse - even though user specified. Under the rest cases, the changes should be no-op. ## How was this patch tested? Add corresponding test cases in `PairRDDFunctionsSuite` and `PartitioningSuite`. Author: Xingbo Jiang <[email protected]> Closes #20091 from jiangxb1987/partitioner. (cherry picked from commit 96cb60b) Signed-off-by: Mridul Muralidharan <[email protected]>
What changes were proposed in this pull request?
In choosing a Partitioner to use for a cogroup-like operation between a number of RDDs, the default behaviour was if some of the RDDs already have a partitioner, we choose the one amongst them with the maximum number of partitions.
This behaviour, in some cases, could hit the 2G limit (SPARK-6235). To illustrate one such scenario, consider two RDDs:
rDD1: with smaller data and smaller number of partitions, alongwith a Partitioner.
rDD2: with much larger data and a larger number of partitions, without a Partitioner.
The cogroup of these two RDDs could hit the 2G limit, as a larger amount of data is shuffled into a smaller number of partitions.
This PR introduces a safety-check wherein the Partitioner is chosen only if either of the following conditions are met:
How was this patch tested?
Unit tests in PartitioningSuite and PairRDDFunctionsSuite