diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index e53a78ead2c0e..214b9be8c7d81 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -55,14 +55,23 @@ object Partitioner {
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
+ val shortPartitionerNames = Map(
+ "hash" -> "org.apache.spark.HashPartitioner",
+ "byteswap" -> "org.apache.spark.ByteswapPartitioner"
+ )
+ val defaultPartitionerName = rdd.conf.get("spark.default.partitioner", "hash")
+ val className =
+ shortPartitionerNames.getOrElse(defaultPartitionerName.toLowerCase, defaultPartitionerName)
+ val ctor = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
+ .getConstructor(classOf[Int])
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
- new HashPartitioner(rdd.context.defaultParallelism)
+ ctor.newInstance(rdd.context.defaultParallelism: java.lang.Integer).asInstanceOf[Partitioner]
} else {
- new HashPartitioner(bySize.head.partitions.size)
+ ctor.newInstance(bySize.head.partitions.size: java.lang.Integer).asInstanceOf[Partitioner]
}
}
}
@@ -93,6 +102,18 @@ class HashPartitioner(partitions: Int) extends Partitioner {
override def hashCode: Int = numPartitions
}
+/**
+ * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
+ * Java's `Object.hashCode`. In order to spread-out hashCodes that are divisible by
+ * `numPartitions`, `byteswap32` is applied to the hashCodes before modding by `numPartitions`.
+ */
+class ByteswapPartitioner(partitions: Int) extends HashPartitioner(partitions) {
+ override def getPartition(key: Any): Int = key match {
+ case null => 0
+ case _ => Utils.nonNegativeMod(byteswap32(key.hashCode), numPartitions)
+ }
+}
+
/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index be972c5e97a7e..9f11db88f92b2 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -32,6 +32,16 @@ import scala.util.Try
class PipedRDDSuite extends FunSuite with SharedSparkContext {
+ override def beforeAll() {
+ System.setProperty("spark.default.partitioner", "hash")
+ super.beforeAll()
+ }
+
+ override def afterAll() {
+ System.clearProperty("spark.default.partitioner")
+ super.afterAll()
+ }
+
test("basic pipe") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
diff --git a/docs/configuration.md b/docs/configuration.md
index 13fc251c1733f..029213558fcd4 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -488,6 +488,17 @@ Apart from these, the following properties are also available, and may be useful
(groupByKey
, reduceByKey
, etc) when not set by user.
+
spark.default.partitioner
hash
and byteswap
. Both are based on the hashCode
of
+ the keys mod
the number of partitions, but the byteswap
partitioner also
+ applies byteswap32
to the hash codes, which helps guarantee that all partitions are used
+ even when the hash codes are divisible by a factor of the number of partitions.
+ spark.broadcast.factory