From 961e3848cea1dc1b6568c1612eef7bedba4270d5 Mon Sep 17 00:00:00 2001 From: sujithjay Date: Sat, 23 Dec 2017 20:54:13 +0530 Subject: [PATCH] [SPARK-22465][Core][WIP] Changes after code review --- .../scala/org/apache/spark/Partitioner.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 1d882764c40cc..ef6a7f097c50e 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -60,9 +60,15 @@ object Partitioner { def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val rdds = (Seq(rdd) ++ others) val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) - if (hasPartitioner.nonEmpty - && isEligiblePartitioner(hasPartitioner.maxBy(_.partitions.length), rdds)) { - hasPartitioner.maxBy(_.partitions.length).partitioner.get + + val hasMaxPartitioner = if(hasPartitioner.nonEmpty){ + Some(hasPartitioner.maxBy(_.partitions.length)) + } else { + None + } + + if(isEligiblePartitioner(hasMaxPartitioner, rdds)) { + hasMaxPartitioner.get.partitioner.get } else { if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) @@ -77,9 +83,12 @@ object Partitioner { * less than and within a single order of magnitude of the max number of upstream partitions; * otherwise, returns false */ - private def isEligiblePartitioner(hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = { + private def isEligiblePartitioner(hasMaxPartitioner: Option[RDD[_]], rdds: Seq[RDD[_]]): Boolean = { + if(hasMaxPartitioner.isEmpty){ + return false + } val maxPartitions = rdds.map(_.partitions.length).max - log10(maxPartitions).floor - log10(hasMaxPartitioner.getNumPartitions).floor < 1 + log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1 } }