Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22465][Core] Add a safety-check to RDD defaultPartitioner #20002

Closed
wants to merge 12 commits into from

Conversation

sujithjay
Copy link

What changes were proposed in this pull request?

In choosing a Partitioner to use for a cogroup-like operation between a number of RDDs, the default behaviour was if some of the RDDs already have a partitioner, we choose the one amongst them with the maximum number of partitions.

This behaviour, in some cases, could hit the 2G limit (SPARK-6235). To illustrate one such scenario, consider two RDDs:
rDD1: with smaller data and smaller number of partitions, alongwith a Partitioner.
rDD2: with much larger data and a larger number of partitions, without a Partitioner.

The cogroup of these two RDDs could hit the 2G limit, as a larger amount of data is shuffled into a smaller number of partitions.

This PR introduces a safety-check wherein the Partitioner is chosen only if either of the following conditions are met:

  1. if the number of partitions of the RDD associated with the Partitioner is greater than or equal to the max number of upstream partitions; or
  2. if the number of partitions of the RDD associated with the Partitioner is less than and within a single order of magnitude of the max number of upstream partitions.

How was this patch tested?

Unit tests in PartitioningSuite and PairRDDFunctionsSuite

sujithjay added 2 commits December 16, 2017 17:46
 that ignores existing Partitioners, if they are more than a single order of magnitude smaller than the max number of upstream partitions
@sujithjay
Copy link
Author

SparkR test failure seems unrelated to this PR. Any ideas what's wrong?

@sujithjay
Copy link
Author

sujithjay commented Dec 16, 2017

Hi @HyukjinKwon , can you please help me with these SparkR test failures? They seem unrelated to me.

@sujithjay
Copy link
Author

sujithjay commented Dec 16, 2017

cc: @tgravescs @codlife Could you please review this PR?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 16, 2017

Yup, AppVeyor test seems unrelated. I took a quick look and it seems AppVeyor test failure is related with the latest testthat (1.0.2 -> 2.0.0). Will take a look for this separately.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 17, 2017

It was fixed in #20003. Rebasing should make the test passed.

@sujithjay
Copy link
Author

Thank you, @HyukjinKwon . The tests passed after rebasing.

@sujithjay
Copy link
Author

@tgravescs , could you please take a look when you have some time ?

@tgravescs
Copy link
Contributor

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Dec 20, 2017

Test build #85192 has finished for PR 20002 at commit 4b2dcac.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

ok to test

@tgravescs
Copy link
Contributor

@sujithjay thanks for working on this. I will review but I'm not sure I will get to it for a bit, I'm out for the holidays and not sure I can give this the time it needs for a full review today.

@sujithjay
Copy link
Author

@tgravescs Thank you for keeping me informed. I look forward to receiving your review. Happy holidays!

@@ -57,7 +60,8 @@ object Partitioner {
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
if (hasPartitioner.nonEmpty) {
if (hasPartitioner.nonEmpty
&& isEligiblePartitioner(hasPartitioner.maxBy(_.partitions.length), rdds)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasPartitioner.maxBy(_.partitions.length) is used repeatedly, pull that into a variable ?

*/
private def isEligiblePartitioner(hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = {
val maxPartitions = rdds.map(_.partitions.length).max
log10(maxPartitions).floor - log10(hasMaxPartitioner.getNumPartitions).floor < 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why .floor ?
It causes unnecessary discontinuity imo, for example: (9, 11) will not satisfy - but it should.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mridulm , I suppose I was trying to ensure a strict order-of-magnitude check; but, I agree it leads to a discontinuity. I will change this, and the corresponding test cases.

@SparkQA
Copy link

SparkQA commented Dec 20, 2017

Test build #85200 has finished for PR 20002 at commit ca6aa08.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 23, 2017

Test build #85342 has finished for PR 20002 at commit 961e384.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 23, 2017

Test build #85344 has finished for PR 20002 at commit 8b35452.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 23, 2017

Test build #85346 has finished for PR 20002 at commit 4729d80.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sujithjay
Copy link
Author

Scala style tests are failing on a file 'SparkHiveExample.scala' , which is unrelated to this PR. Will rebase to master and try again.

@HyukjinKwon
Copy link
Member

@sujithjay, I opened a hotfix. It should be fine soon (maybe after few hours).

@sujithjay
Copy link
Author

Thank you, @HyukjinKwon . I will try again after the hotfix is merged to master.

@SparkQA
Copy link

SparkQA commented Dec 23, 2017

Test build #85348 has finished for PR 20002 at commit 6623227.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -21,6 +21,8 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.language.existentials
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why was this required ?

Copy link
Author

@sujithjay sujithjay Dec 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this import, there was a compiler warning:

Warning:(66, 29) inferred existential type Option[org.apache.spark.rdd.RDD[_$2]]( forSome { type _$2 }), which cannot be expressed by wildcards,  should be enabled
by making the implicit value scala.language.existentials visible.
This can be achieved by adding the import clause 'import scala.language.existentials'
or by setting the compiler option -language:existentials.
See the Scaladoc for value scala.language.existentials for a discussion
why the feature should be explicitly enabled.

The build on Jenkins failed because of this warning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we explicitly set the type, is it still required ? For example, with val hasMaxPartitioner: Option[RDD[_]] = ... ?

@SparkQA
Copy link

SparkQA commented Dec 23, 2017

Test build #85349 has finished for PR 20002 at commit 3dd1ad8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

assert(partitioner1.numPartitions == rdd1.getNumPartitions)
assert(partitioner2.numPartitions == rdd3.getNumPartitions)
assert(partitioner3.numPartitions == rdd3.getNumPartitions)
assert(partitioner4.numPartitions == rdd3.getNumPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a testcase such that numPartitions 9 vs 11 is not treated as an order of magnitude jump (to prevent future changes which end up breaking this).

@mridulm
Copy link
Contributor

mridulm commented Dec 24, 2017

I left a couple of comments @sujithjay, overall it is looking good, thanks for working on it !
We can merge it once they are addressed.

@sujithjay
Copy link
Author

Thank you, @mridulm for reviewing this PR. I have addressed the latest review comments.

@SparkQA
Copy link

SparkQA commented Dec 24, 2017

Test build #85354 has finished for PR 20002 at commit 3b08951.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@mridulm
Copy link
Contributor

mridulm commented Dec 24, 2017

Looks good @sujithjay ... once we have a successful build, I will merge it in.

@SparkQA
Copy link

SparkQA commented Dec 24, 2017

Test build #85357 has finished for PR 20002 at commit 3b08951.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sujithjay
Copy link
Author

sujithjay commented Dec 24, 2017

The failed unit test (at HistoryServerSuite.scala:350) seems unrelated to this PR.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 24, 2017

Test build #85360 has finished for PR 20002 at commit 3b08951.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sujithjay sujithjay changed the title [SPARK-22465][Core][WIP] Add a safety-check to RDD defaultPartitioner [SPARK-22465][Core] Add a safety-check to RDD defaultPartitioner Dec 24, 2017
@mridulm
Copy link
Contributor

mridulm commented Dec 24, 2017

Merged, thanks for fixing this @sujithjay !

@asfgit asfgit closed this in 0bf1a74 Dec 24, 2017
ghost pushed a commit to dbtsai/spark that referenced this pull request Jan 23, 2018
…rtitioner when defaultParallelism is set

## What changes were proposed in this pull request?

apache#20002 purposed a way to safe check the default partitioner, however, if `spark.default.parallelism` is set, the defaultParallelism still could be smaller than the proper number of partitions for upstreams RDDs. This PR tries to extend the approach to address the condition when `spark.default.parallelism` is set.

The requirements where the PR helps with are :
- Max partitioner is not eligible since it is atleast an order smaller, and
- User has explicitly set 'spark.default.parallelism', and
- Value of 'spark.default.parallelism' is lower than max partitioner
- Since max partitioner was discarded due to being at least an order smaller, default parallelism is worse - even though user specified.

Under the rest cases, the changes should be no-op.

## How was this patch tested?

Add corresponding test cases in `PairRDDFunctionsSuite` and `PartitioningSuite`.

Author: Xingbo Jiang <[email protected]>

Closes apache#20091 from jiangxb1987/partitioner.
asfgit pushed a commit that referenced this pull request Jan 23, 2018
…rtitioner when defaultParallelism is set

## What changes were proposed in this pull request?

#20002 purposed a way to safe check the default partitioner, however, if `spark.default.parallelism` is set, the defaultParallelism still could be smaller than the proper number of partitions for upstreams RDDs. This PR tries to extend the approach to address the condition when `spark.default.parallelism` is set.

The requirements where the PR helps with are :
- Max partitioner is not eligible since it is atleast an order smaller, and
- User has explicitly set 'spark.default.parallelism', and
- Value of 'spark.default.parallelism' is lower than max partitioner
- Since max partitioner was discarded due to being at least an order smaller, default parallelism is worse - even though user specified.

Under the rest cases, the changes should be no-op.

## How was this patch tested?

Add corresponding test cases in `PairRDDFunctionsSuite` and `PartitioningSuite`.

Author: Xingbo Jiang <[email protected]>

Closes #20091 from jiangxb1987/partitioner.

(cherry picked from commit 96cb60b)
Signed-off-by: Mridul Muralidharan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants