-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22465][FOLLOWUP] Update the number of partitions of default partitioner when defaultParallelism is set #20091
Conversation
cc @sujithjay @mridulm @cloud-fan PTAL |
Test build #85436 has finished for PR 20091 at commit
|
This changes the existing behavior of spark - expectation is for RDD's without partitioner to use Consider case of cogroup of two filtered rdd's - users set parallelism (either explicitly or implicitly in case of yarn) to handle these cases. |
The major concern is that A further more issue is that, we should rethink whether we should rely on |
@jiangxb1987 I am not disagreeing with your hypothesis that default parallelism might not be optimal in all cases within an application (example - when different RDD's in application have widely varying cardinalities). Since spark.default.parallelism is an exposed interface, which applications depend on, changing the semantics here will be a regression in terms of functionality and will be breaking an exposed contract in spark (#20002 explicitly applied to a documented case where default does not apply). This is why we have the option of explicitly overriding number of partitions when default does not work well. |
@mridulm Actually you have a good point on that we should be extremely careful in making change s related to an exposed interface, now I'll narrow down the scope of this PR to:
Does this make more sense? |
@jiangxb1987 I am not sure I followed that completely. |
As now used in the function |
Can you code this up ? I am really not able to parse that block of text :-) |
Sure, will do after I arrived SF next Monday. Thanks! :) |
4751463
to
62088ca
Compare
Test build #86202 has finished for PR 20091 at commit
|
// If the existing max partitioner is an eligible one, or its partitions number is larger | ||
// than the default number of partitions, use the existing partitioner. | ||
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) || | ||
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the core change. I think it makes sense as it fixes a regression in #20002
If the partitioner is not eligible, but its numPartition is larger the the default one, we should still pick this partitioner instead of creating a new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are multiple cases here.
a) spark.default.parallelism is not set by user.
For this case, PR is a noop
b) maxPartitions is atleast an order higher than max partitioner
b.1) If spark.default.parallelism is not set, the PR is a noop.
b.2) spark.default.parallelism is explicitly set by user.
This is a change in behavior which has been introduced - rely on user specified value instead of trying to infer it when inferred value is off by atleast an order.
If users were setting suboptimal values for "spark.default.parallelism" - then there will be a change in behavior - though I would argue this is the expected behavior given documentation of 'spark.default.parallelism'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on how you define "default". In this case, if we can benefit from reusing an existing partitioner, we should pick that partitioner. If we want to respect spark.default.parallelism
strictly, we should not reuse partitioner at all.
For this particular case, picking the existing partitioner is obviously a better choice and it was the behavior before #20002 , so I'm +1 on this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Edited, hopefully, for clarity]
It depends on how you define "default".
I dont see an ambiguity here - am I missing something ?
To rephrase my point - this proposed PR has an impact only if user has explicitly set 'spark.default.parallelism' - else it is a noop.
If this is not the case (other than desired behavior of SPARK-22465), I might be missing something; do let me know !
What is the concern here ? Users have set incorrect values for spark.default.parallelism ?
If we want to respect spark.default.parallelism strictly, we should not reuse partitioner at all.
I agree with you - we should not have - except that ship has sailed long long time back - since atleast 0.5 this has been the behavior in spark - I dont have context before that.
Historically, default parallelism was added later - using "largest partitioner if set or largest partition size when no partitioner is set" was the behavior. When default parallelism was introduced, probably (I guess) for backward compatible, the behavior was continued.
#20002 surgically fixed only the case when inferred partition size was off by atleast an order.
When it is off by an order - dont rely on largest partitioner, it is not useful due to OOM's.
In this case, if user has explicitly specified spark.default.parallelism, rely on user provided value - else preserve existing behavior of picking largest partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we all agree that reusing partitioner is an existing behavior and we should not stick to spark.default.parallelism
here.
#20002 is good as it fixes a bad case where reusing partitioner slows down the query. And this PR surgically fixed one regression introduced by #20002 that, even if the existing partitioner is not eligible(has very little partitions), it's still better than fallback to default parallelism.
Thanks for coding it up @jiangxb1987 ! So if I understand it correctly, the requirements where the PR helps with are :
Does it impact any other usecase or flow ? I want to make sure I am not missing anything. |
@mridulm Great write up! Yeah it's exactly that you described, and I've copied them to the PR description. |
@jiangxb1987 Thanks for clarifying, looks good to me - I will merge it later today evening (assuming someone else does not before :) ) |
@mridulm Thank you! |
* If any of the RDDs already has a partitioner, and the partitioner is an eligible one (with a | ||
* partitions number that is not less than the max number of upstream partitions by an order of | ||
* magnitude), or the number of partitions is larger than the default one, we'll choose the | ||
* exsiting partitioner. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should rephrase this for clarity.
How about
"When available, we choose the partitioner from rdds with maximum number of partitions. If this partitioner is eligible (number of partitions within an order of maximum number of partitions in rdds), or has partition number higher than default partitions number - we use this partitioner"
.partitionBy(new HashPartitioner(100)) | ||
val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))) | ||
.partitionBy(new HashPartitioner(9)) | ||
val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a case where partitioner is not used and default (from spark.default.parallelism) gets used ?
For example, something like the following pseudo
val rdd6 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))).partitionBy(new HashPartitioner(3))
...
Partitioner.defaultPartitioner(rdd1, rdd6).numPartitions == sc.conf.get("spark.default.parallelism").toInt
} | ||
} | ||
|
||
test("cogroup between multiple RDD when defaultParallelism is set with huge number of " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "set; with huge number of partitions in upstream RDDs"
Thanks @jiangxb1987 for the great work ! |
Test build #86496 has finished for PR 20091 at commit
|
…rtitioner when defaultParallelism is set ## What changes were proposed in this pull request? #20002 purposed a way to safe check the default partitioner, however, if `spark.default.parallelism` is set, the defaultParallelism still could be smaller than the proper number of partitions for upstreams RDDs. This PR tries to extend the approach to address the condition when `spark.default.parallelism` is set. The requirements where the PR helps with are : - Max partitioner is not eligible since it is atleast an order smaller, and - User has explicitly set 'spark.default.parallelism', and - Value of 'spark.default.parallelism' is lower than max partitioner - Since max partitioner was discarded due to being at least an order smaller, default parallelism is worse - even though user specified. Under the rest cases, the changes should be no-op. ## How was this patch tested? Add corresponding test cases in `PairRDDFunctionsSuite` and `PartitioningSuite`. Author: Xingbo Jiang <[email protected]> Closes #20091 from jiangxb1987/partitioner. (cherry picked from commit 96cb60b) Signed-off-by: Mridul Muralidharan <[email protected]>
…mPartitions is equal to maxPartitioner.numPartitions ## What changes were proposed in this pull request? Followup of #20091. We could also use existing partitioner when defaultNumPartitions is equal to the maxPartitioner's numPartitions. ## How was this patch tested? Existed. Closes #23581 from Ngone51/dev-use-existing-partitioner-when-defaultNumPartitions-equalTo-MaxPartitioner#-numPartitions. Authored-by: Ngone51 <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…mPartitions is equal to maxPartitioner.numPartitions ## What changes were proposed in this pull request? Followup of apache#20091. We could also use existing partitioner when defaultNumPartitions is equal to the maxPartitioner's numPartitions. ## How was this patch tested? Existed. Closes apache#23581 from Ngone51/dev-use-existing-partitioner-when-defaultNumPartitions-equalTo-MaxPartitioner#-numPartitions. Authored-by: Ngone51 <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
#20002 purposed a way to safe check the default partitioner, however, if
spark.default.parallelism
is set, the defaultParallelism still could be smaller than the proper number of partitions for upstreams RDDs. This PR tries to extend the approach to address the condition whenspark.default.parallelism
is set.The requirements where the PR helps with are :
Under the rest cases, the changes should be no-op.
How was this patch tested?
Add corresponding test cases in
PairRDDFunctionsSuite
andPartitioningSuite
.