Skip to content

Commit

Permalink
[SPARK-22465][Core][WIP] Changes after code review
Browse files Browse the repository at this point in the history
  • Loading branch information
sujithjay committed Dec 23, 2017
1 parent ca6aa08 commit 961e384
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ object Partitioner {
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
if (hasPartitioner.nonEmpty
&& isEligiblePartitioner(hasPartitioner.maxBy(_.partitions.length), rdds)) {
hasPartitioner.maxBy(_.partitions.length).partitioner.get

val hasMaxPartitioner = if(hasPartitioner.nonEmpty){
Some(hasPartitioner.maxBy(_.partitions.length))
} else {
None
}

if(isEligiblePartitioner(hasMaxPartitioner, rdds)) {
hasMaxPartitioner.get.partitioner.get
} else {
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
Expand All @@ -77,9 +83,12 @@ object Partitioner {
* less than and within a single order of magnitude of the max number of upstream partitions;
* otherwise, returns false
*/
private def isEligiblePartitioner(hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = {
private def isEligiblePartitioner(hasMaxPartitioner: Option[RDD[_]], rdds: Seq[RDD[_]]): Boolean = {
if(hasMaxPartitioner.isEmpty){
return false
}
val maxPartitions = rdds.map(_.partitions.length).max
log10(maxPartitions).floor - log10(hasMaxPartitioner.getNumPartitions).floor < 1
log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1
}
}

Expand Down

0 comments on commit 961e384

Please sign in to comment.