From 176270b3dbddb1f8d1330709dfea2022eebb7a11 Mon Sep 17 00:00:00 2001 From: sujithjay Date: Sat, 16 Dec 2017 17:46:13 +0530 Subject: [PATCH 1/8] [SPARK-22465][Core][WIP] Add a safety-check to RDD defaultPartitioner that ignores existing Partitioners, if they are more than a single order of magnitude smaller than the max number of upstream partitions --- .../scala/org/apache/spark/Partitioner.scala | 17 ++++++++++++-- .../org/apache/spark/PartitioningSuite.scala | 21 ++++++++++++++++++ .../spark/rdd/PairRDDFunctionsSuite.scala | 22 +++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index debbd8d7c26c9..c852723bd4b49 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -21,6 +21,7 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.math.log10 import scala.reflect.ClassTag import scala.util.hashing.byteswap32 @@ -42,7 +43,9 @@ object Partitioner { /** * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. * - * If any of the RDDs already has a partitioner, choose that one. + * If any of the RDDs already has a partitioner, and the number of partitions of the + * partitioner is either greater than or is less than and within a single order of + * magnitude of the max number of upstream partitions, choose that one. * * Otherwise, we use a default HashPartitioner. For the number of partitions, if * spark.default.parallelism is set, then we'll use the value from SparkContext @@ -57,7 +60,7 @@ object Partitioner { def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val rdds = (Seq(rdd) ++ others) val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) - if (hasPartitioner.nonEmpty) { + if (hasPartitioner.nonEmpty && isEligiblePartitioner(hasPartitioner.maxBy(_.partitions.length), rdds)) { hasPartitioner.maxBy(_.partitions.length).partitioner.get } else { if (rdd.context.conf.contains("spark.default.parallelism")) { @@ -67,6 +70,16 @@ object Partitioner { } } } + + /** + * Returns true if the number of partitions of the RDD is either greater than or is + * less than and within a single order of magnitude of the max number of upstream partitions; + * otherwise, returns false + */ + private def isEligiblePartitioner(hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = { + val maxPartitions = rdds.map(_.partitions.length).max + log10(maxPartitions).floor - log10(hasMaxPartitioner.getNumPartitions).floor < 1 + } } /** diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index dfe4c25670ce0..3b98bb7ee3caa 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -259,6 +259,27 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva val partitioner = new RangePartitioner(22, rdd) assert(partitioner.numPartitions === 3) } + + test("defaultPartitioner") { + val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150) + val rdd2 = sc + .parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))) + .partitionBy(new HashPartitioner(10)) + val rdd3 = sc + .parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14))) + .partitionBy(new HashPartitioner(100)) + + val partitioner1 = Partitioner.defaultPartitioner(rdd1, rdd2) + val partitioner2 = Partitioner.defaultPartitioner(rdd2, rdd3) + val partitioner3 = Partitioner.defaultPartitioner(rdd3, rdd1) + val partitioner4 = Partitioner.defaultPartitioner(rdd1, rdd2, rdd3) + + assert(partitioner1.numPartitions == rdd1.getNumPartitions) + assert(partitioner2.numPartitions == rdd3.getNumPartitions) + assert(partitioner3.numPartitions == rdd3.getNumPartitions) + assert(partitioner4.numPartitions == rdd3.getNumPartitions) + + } } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 65d35264dc108..a39e0469272fe 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -310,6 +310,28 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { assert(joined.size > 0) } + // See SPARK-22465 + test("cogroup between multiple RDD " + + "with an order of magnitude difference in number of partitions") { + val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000) + val rdd2 = sc + .parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + .partitionBy(new HashPartitioner(10)) + val joined = rdd1.cogroup(rdd2) + assert(joined.getNumPartitions == rdd1.getNumPartitions) + } + + // See SPARK-22465 + test("cogroup between multiple RDD" + + " with number of partitions similar in order of magnitude") { + val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20) + val rdd2 = sc + .parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + .partitionBy(new HashPartitioner(10)) + val joined = rdd1.cogroup(rdd2) + assert(joined.getNumPartitions == rdd2.getNumPartitions) + } + test("rightOuterJoin") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) From ca6aa08e3d2f6a053992fb31faed35baa46fb5a6 Mon Sep 17 00:00:00 2001 From: sujithjay Date: Wed, 20 Dec 2017 20:34:30 +0530 Subject: [PATCH 2/8] [SPARK-22465][Core][WIP] Fix Scala style checks --- core/src/main/scala/org/apache/spark/Partitioner.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index c852723bd4b49..1d882764c40cc 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -60,7 +60,8 @@ object Partitioner { def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val rdds = (Seq(rdd) ++ others) val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) - if (hasPartitioner.nonEmpty && isEligiblePartitioner(hasPartitioner.maxBy(_.partitions.length), rdds)) { + if (hasPartitioner.nonEmpty + && isEligiblePartitioner(hasPartitioner.maxBy(_.partitions.length), rdds)) { hasPartitioner.maxBy(_.partitions.length).partitioner.get } else { if (rdd.context.conf.contains("spark.default.parallelism")) { From 961e3848cea1dc1b6568c1612eef7bedba4270d5 Mon Sep 17 00:00:00 2001 From: sujithjay Date: Sat, 23 Dec 2017 20:54:13 +0530 Subject: [PATCH 3/8] [SPARK-22465][Core][WIP] Changes after code review --- .../scala/org/apache/spark/Partitioner.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 1d882764c40cc..ef6a7f097c50e 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -60,9 +60,15 @@ object Partitioner { def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val rdds = (Seq(rdd) ++ others) val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) - if (hasPartitioner.nonEmpty - && isEligiblePartitioner(hasPartitioner.maxBy(_.partitions.length), rdds)) { - hasPartitioner.maxBy(_.partitions.length).partitioner.get + + val hasMaxPartitioner = if(hasPartitioner.nonEmpty){ + Some(hasPartitioner.maxBy(_.partitions.length)) + } else { + None + } + + if(isEligiblePartitioner(hasMaxPartitioner, rdds)) { + hasMaxPartitioner.get.partitioner.get } else { if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) @@ -77,9 +83,12 @@ object Partitioner { * less than and within a single order of magnitude of the max number of upstream partitions; * otherwise, returns false */ - private def isEligiblePartitioner(hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = { + private def isEligiblePartitioner(hasMaxPartitioner: Option[RDD[_]], rdds: Seq[RDD[_]]): Boolean = { + if(hasMaxPartitioner.isEmpty){ + return false + } val maxPartitions = rdds.map(_.partitions.length).max - log10(maxPartitions).floor - log10(hasMaxPartitioner.getNumPartitions).floor < 1 + log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1 } } From 8b3545265b534e511ac947071e416360184d740e Mon Sep 17 00:00:00 2001 From: sujithjay Date: Sat, 23 Dec 2017 20:59:45 +0530 Subject: [PATCH 4/8] [SPARK-22465][Core][WIP] Scala style checks --- core/src/main/scala/org/apache/spark/Partitioner.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index ef6a7f097c50e..0bb387d537df3 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -83,7 +83,8 @@ object Partitioner { * less than and within a single order of magnitude of the max number of upstream partitions; * otherwise, returns false */ - private def isEligiblePartitioner(hasMaxPartitioner: Option[RDD[_]], rdds: Seq[RDD[_]]): Boolean = { + private def isEligiblePartitioner(hasMaxPartitioner: Option[RDD[_]], + rdds: Seq[RDD[_]]): Boolean = { if(hasMaxPartitioner.isEmpty){ return false } From 4729d8036e984ecb7e8143f9f1cd7a3d84ec1754 Mon Sep 17 00:00:00 2001 From: sujithjay Date: Sat, 23 Dec 2017 21:18:48 +0530 Subject: [PATCH 5/8] [SPARK-22465][Core][WIP] More Scala style checks --- .../scala/org/apache/spark/Partitioner.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 0bb387d537df3..ae264fe16e39f 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -61,13 +61,13 @@ object Partitioner { val rdds = (Seq(rdd) ++ others) val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) - val hasMaxPartitioner = if(hasPartitioner.nonEmpty){ + val hasMaxPartitioner = if (hasPartitioner.nonEmpty) { Some(hasPartitioner.maxBy(_.partitions.length)) } else { None } - if(isEligiblePartitioner(hasMaxPartitioner, rdds)) { + if (isEligiblePartitioner(hasMaxPartitioner, rdds)) { hasMaxPartitioner.get.partitioner.get } else { if (rdd.context.conf.contains("spark.default.parallelism")) { @@ -79,13 +79,15 @@ object Partitioner { } /** - * Returns true if the number of partitions of the RDD is either greater than or is - * less than and within a single order of magnitude of the max number of upstream partitions; + * Returns true if the number of partitions of the RDD is either greater + * than or is less than and within a single order of magnitude of the + * max number of upstream partitions; * otherwise, returns false */ - private def isEligiblePartitioner(hasMaxPartitioner: Option[RDD[_]], - rdds: Seq[RDD[_]]): Boolean = { - if(hasMaxPartitioner.isEmpty){ + private def isEligiblePartitioner( + hasMaxPartitioner: Option[RDD[_]], + rdds: Seq[RDD[_]]): Boolean = { + if (hasMaxPartitioner.isEmpty) { return false } val maxPartitions = rdds.map(_.partitions.length).max From 3dd1ad8e25b7c23b58d33cc422570f4cb133fd4b Mon Sep 17 00:00:00 2001 From: sujithjay Date: Sat, 23 Dec 2017 22:27:07 +0530 Subject: [PATCH 6/8] [SPARK-22465][Core][WIP] Import scala.language.existentials to fix compiler warnings --- core/src/main/scala/org/apache/spark/Partitioner.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index ae264fe16e39f..8d83b70794314 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -21,6 +21,7 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.language.existentials import scala.math.log10 import scala.reflect.ClassTag import scala.util.hashing.byteswap32 From 62b17e98914ad743205f56deb168ce780511276b Mon Sep 17 00:00:00 2001 From: sujithjay Date: Sun, 24 Dec 2017 10:53:41 +0530 Subject: [PATCH 7/8] [SPARK-22465][Core][WIP] Remove import scala.language.existentials; explicitly mention type of existential type --- core/src/main/scala/org/apache/spark/Partitioner.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 8d83b70794314..437bbaae1968b 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -21,7 +21,6 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.language.existentials import scala.math.log10 import scala.reflect.ClassTag import scala.util.hashing.byteswap32 @@ -62,7 +61,7 @@ object Partitioner { val rdds = (Seq(rdd) ++ others) val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) - val hasMaxPartitioner = if (hasPartitioner.nonEmpty) { + val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) { Some(hasPartitioner.maxBy(_.partitions.length)) } else { None From 3b089518e66bc4facf7bc07db1d12663dd567393 Mon Sep 17 00:00:00 2001 From: sujithjay Date: Sun, 24 Dec 2017 10:55:32 +0530 Subject: [PATCH 8/8] [SPARK-22465][Core][WIP] Add test for 9 vs 11 numPartitions case --- .../src/test/scala/org/apache/spark/PartitioningSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 3b98bb7ee3caa..155ca17db726b 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -268,16 +268,22 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva val rdd3 = sc .parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14))) .partitionBy(new HashPartitioner(100)) + val rdd4 = sc + .parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))) + .partitionBy(new HashPartitioner(9)) + val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11) val partitioner1 = Partitioner.defaultPartitioner(rdd1, rdd2) val partitioner2 = Partitioner.defaultPartitioner(rdd2, rdd3) val partitioner3 = Partitioner.defaultPartitioner(rdd3, rdd1) val partitioner4 = Partitioner.defaultPartitioner(rdd1, rdd2, rdd3) + val partitioner5 = Partitioner.defaultPartitioner(rdd4, rdd5) assert(partitioner1.numPartitions == rdd1.getNumPartitions) assert(partitioner2.numPartitions == rdd3.getNumPartitions) assert(partitioner3.numPartitions == rdd3.getNumPartitions) assert(partitioner4.numPartitions == rdd3.getNumPartitions) + assert(partitioner5.numPartitions == rdd4.getNumPartitions) } }