Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22465][FOLLOWUP] Update the number of partitions of default partitioner when defaultParallelism is set #20091

Closed
wants to merge 2 commits into from

Conversation

jiangxb1987
Copy link
Contributor

@jiangxb1987 jiangxb1987 commented Dec 27, 2017

What changes were proposed in this pull request?

#20002 purposed a way to safe check the default partitioner, however, if spark.default.parallelism is set, the defaultParallelism still could be smaller than the proper number of partitions for upstreams RDDs. This PR tries to extend the approach to address the condition when spark.default.parallelism is set.

The requirements where the PR helps with are :

  • Max partitioner is not eligible since it is atleast an order smaller, and
  • User has explicitly set 'spark.default.parallelism', and
  • Value of 'spark.default.parallelism' is lower than max partitioner
  • Since max partitioner was discarded due to being at least an order smaller, default parallelism is worse - even though user specified.

Under the rest cases, the changes should be no-op.

How was this patch tested?

Add corresponding test cases in PairRDDFunctionsSuite and PartitioningSuite.

@jiangxb1987 jiangxb1987 changed the title [SPARK-22465] Update the number of partitions of default partitioner when defaultParallelism is set [SPARK-22465][FOLLOWUP] Update the number of partitions of default partitioner when defaultParallelism is set Dec 27, 2017
@jiangxb1987
Copy link
Contributor Author

cc @sujithjay @mridulm @cloud-fan PTAL

@SparkQA
Copy link

SparkQA commented Dec 27, 2017

Test build #85436 has finished for PR 20091 at commit 4751463.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Dec 27, 2017

This changes the existing behavior of spark - expectation is for RDD's without partitioner to use spark.default.parallelism for shuffle; when one or more RDD's do have partitioner it was more an implementation detail of which gets picked (in some cases 'first', in others 'biggest', etc).

Consider case of cogroup of two filtered rdd's - users set parallelism (either explicitly or implicitly in case of yarn) to handle these cases.

@jiangxb1987
Copy link
Contributor Author

The major concern is that spark.default.parallelism usually is set a relatively small value, so in case the safety-check failed, the value of defaultParallelism can even be smaller than the number of partitions of the existing partitioner, this is the regression case I want to fix in this PR.

A further more issue is that, we should rethink whether we should rely on defaultParallelism to determine the numPartitions of the default partitioner, or the number of partitions should be determined completely dynamistic by the upstream RDDs? The current same-as-defaultParallelism way is really prone to cause OOM during shuffle stage.

@mridulm
Copy link
Contributor

mridulm commented Dec 28, 2017

@jiangxb1987 I am not disagreeing with your hypothesis that default parallelism might not be optimal in all cases within an application (example - when different RDD's in application have widely varying cardinalities).

Since spark.default.parallelism is an exposed interface, which applications depend on, changing the semantics here will be a regression in terms of functionality and will be breaking an exposed contract in spark (#20002 explicitly applied to a documented case where default does not apply).

This is why we have the option of explicitly overriding number of partitions when default does not work well.

@jiangxb1987
Copy link
Contributor Author

jiangxb1987 commented Dec 28, 2017

@mridulm Actually you have a good point on that we should be extremely careful in making change s related to an exposed interface, now I'll narrow down the scope of this PR to:

  • If the safety check fails and spark.default.parallelism is set, and the value of defaultParallelism is smaller than the number of existing partitioner, then we should still use the existing partitioner.

Does this make more sense?

@mridulm
Copy link
Contributor

mridulm commented Dec 29, 2017

@jiangxb1987 I am not sure I followed that completely.
defaultParallelism will always be present - either via explicit use specification of spark.default.parallelism in spark conf by user or implicitly obtained based on the scheduler backend (both of which are detailed in spark.default.parallelism contract).

@jiangxb1987
Copy link
Contributor Author

jiangxb1987 commented Dec 29, 2017

As now used in the function defaultPartitioner(), defaultParallelism only take effect when spark.default.parallelism is explicitly set. Previously before #20002 , if there is any existing partitioner in upstream RDDs, we won't create a new partitioner using defaultParallelism. But after that change, we may create a new partition whose number of partitions is defaultParallelism when the safety-check failed and spark.default.parallelism is explicitly set, but defaultParallelism can be smaller than the numPartitions of the existing partitioner, so the new partitioner should still fail the safety-check. I'm proposing in the regression case described above, we should still use the existing partitioner, instead of create a new partitioner which have less number of partitions. IMO this also follows the documented spark.default.parallelism contract.

@mridulm
Copy link
Contributor

mridulm commented Dec 29, 2017

Can you code this up ? I am really not able to parse that block of text :-)

@jiangxb1987
Copy link
Contributor Author

Sure, will do after I arrived SF next Monday. Thanks! :)

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86202 has finished for PR 20091 at commit 62088ca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor Author

// If the existing max partitioner is an eligible one, or its partitions number is larger
// than the default number of partitions, use the existing partitioner.
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core change. I think it makes sense as it fixes a regression in #20002

If the partitioner is not eligible, but its numPartition is larger the the default one, we should still pick this partitioner instead of creating a new one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple cases here.

a) spark.default.parallelism is not set by user.
For this case, PR is a noop

b) maxPartitions is atleast an order higher than max partitioner

b.1) If spark.default.parallelism is not set, the PR is a noop.

b.2) spark.default.parallelism is explicitly set by user.

This is a change in behavior which has been introduced - rely on user specified value instead of trying to infer it when inferred value is off by atleast an order.

If users were setting suboptimal values for "spark.default.parallelism" - then there will be a change in behavior - though I would argue this is the expected behavior given documentation of 'spark.default.parallelism'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how you define "default". In this case, if we can benefit from reusing an existing partitioner, we should pick that partitioner. If we want to respect spark.default.parallelism strictly, we should not reuse partitioner at all.

For this particular case, picking the existing partitioner is obviously a better choice and it was the behavior before #20002 , so I'm +1 on this change.

Copy link
Contributor

@mridulm mridulm Jan 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Edited, hopefully, for clarity]

It depends on how you define "default".

I dont see an ambiguity here - am I missing something ?
To rephrase my point - this proposed PR has an impact only if user has explicitly set 'spark.default.parallelism' - else it is a noop.
If this is not the case (other than desired behavior of SPARK-22465), I might be missing something; do let me know !

What is the concern here ? Users have set incorrect values for spark.default.parallelism ?

If we want to respect spark.default.parallelism strictly, we should not reuse partitioner at all.

I agree with you - we should not have - except that ship has sailed long long time back - since atleast 0.5 this has been the behavior in spark - I dont have context before that.
Historically, default parallelism was added later - using "largest partitioner if set or largest partition size when no partitioner is set" was the behavior. When default parallelism was introduced, probably (I guess) for backward compatible, the behavior was continued.

#20002 surgically fixed only the case when inferred partition size was off by atleast an order.
When it is off by an order - dont rely on largest partitioner, it is not useful due to OOM's.
In this case, if user has explicitly specified spark.default.parallelism, rely on user provided value - else preserve existing behavior of picking largest partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we all agree that reusing partitioner is an existing behavior and we should not stick to spark.default.parallelism here.

#20002 is good as it fixes a bad case where reusing partitioner slows down the query. And this PR surgically fixed one regression introduced by #20002 that, even if the existing partitioner is not eligible(has very little partitions), it's still better than fallback to default parallelism.

@mridulm
Copy link
Contributor

mridulm commented Jan 19, 2018

Thanks for coding it up @jiangxb1987 !

So if I understand it correctly, the requirements where the PR helps with are :

  • Max partitioner is not eligible since it is atleast an order smaller, and
  • User has explicitly set 'spark.default.parallelism', and
  • Value of 'spark.default.parallelism' is lower than max partitioner
    • Since max partitioner was discarded due to being atleast an order smaller, default parallelism is worse - even though user specified.

Does it impact any other usecase or flow ? I want to make sure I am not missing anything.
If strictly this, then I agree that the PR makes sense. It is a fairly suboptimal situation which we are hopefully not worsening - even if we are ignoring user specified value (by relying on pre-existing behavior exposed by spark :-) )

@jiangxb1987
Copy link
Contributor Author

@mridulm Great write up! Yeah it's exactly that you described, and I've copied them to the PR description.

@mridulm
Copy link
Contributor

mridulm commented Jan 19, 2018

@jiangxb1987 Thanks for clarifying, looks good to me - I will merge it later today evening (assuming someone else does not before :) )

@jiangxb1987
Copy link
Contributor Author

@mridulm Thank you!

* If any of the RDDs already has a partitioner, and the partitioner is an eligible one (with a
* partitions number that is not less than the max number of upstream partitions by an order of
* magnitude), or the number of partitions is larger than the default one, we'll choose the
* exsiting partitioner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rephrase this for clarity.
How about
"When available, we choose the partitioner from rdds with maximum number of partitions. If this partitioner is eligible (number of partitions within an order of maximum number of partitions in rdds), or has partition number higher than default partitions number - we use this partitioner"

.partitionBy(new HashPartitioner(100))
val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
.partitionBy(new HashPartitioner(9))
val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a case where partitioner is not used and default (from spark.default.parallelism) gets used ?
For example, something like the following pseudo

val rdd6 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))).partitionBy(new HashPartitioner(3))

...
Partitioner.defaultPartitioner(rdd1, rdd6).numPartitions == sc.conf.get("spark.default.parallelism").toInt

}
}

test("cogroup between multiple RDD when defaultParallelism is set with huge number of " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "set; with huge number of partitions in upstream RDDs"

@mridulm
Copy link
Contributor

mridulm commented Jan 22, 2018

Thanks @jiangxb1987 for the great work !
I will merge this in when the build successfully completes.

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86496 has finished for PR 20091 at commit ccdc0e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Jan 23, 2018
…rtitioner when defaultParallelism is set

## What changes were proposed in this pull request?

#20002 purposed a way to safe check the default partitioner, however, if `spark.default.parallelism` is set, the defaultParallelism still could be smaller than the proper number of partitions for upstreams RDDs. This PR tries to extend the approach to address the condition when `spark.default.parallelism` is set.

The requirements where the PR helps with are :
- Max partitioner is not eligible since it is atleast an order smaller, and
- User has explicitly set 'spark.default.parallelism', and
- Value of 'spark.default.parallelism' is lower than max partitioner
- Since max partitioner was discarded due to being at least an order smaller, default parallelism is worse - even though user specified.

Under the rest cases, the changes should be no-op.

## How was this patch tested?

Add corresponding test cases in `PairRDDFunctionsSuite` and `PartitioningSuite`.

Author: Xingbo Jiang <[email protected]>

Closes #20091 from jiangxb1987/partitioner.

(cherry picked from commit 96cb60b)
Signed-off-by: Mridul Muralidharan <[email protected]>
@asfgit asfgit closed this in 96cb60b Jan 23, 2018
cloud-fan pushed a commit that referenced this pull request Jan 23, 2019
…mPartitions is equal to maxPartitioner.numPartitions

## What changes were proposed in this pull request?

Followup of #20091. We could also use existing partitioner when defaultNumPartitions is equal to the maxPartitioner's numPartitions.

## How was this patch tested?

Existed.

Closes #23581 from Ngone51/dev-use-existing-partitioner-when-defaultNumPartitions-equalTo-MaxPartitioner#-numPartitions.

Authored-by: Ngone51 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…mPartitions is equal to maxPartitioner.numPartitions

## What changes were proposed in this pull request?

Followup of apache#20091. We could also use existing partitioner when defaultNumPartitions is equal to the maxPartitioner's numPartitions.

## How was this patch tested?

Existed.

Closes apache#23581 from Ngone51/dev-use-existing-partitioner-when-defaultNumPartitions-equalTo-MaxPartitioner#-numPartitions.

Authored-by: Ngone51 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants