Skip to content

Commit

Permalink
[HOTFIX] [SPARK-3400] Revert 9b225ac "fix GraphX EdgeRDD zipPartitions"
Browse files Browse the repository at this point in the history
9b225ac has been causing GraphX tests
to fail nondeterministically, which is blocking development for others.

Author: Ankur Dave <[email protected]>

Closes #2271 from ankurdave/SPARK-3400 and squashes the following commits:

10c2a97 [Ankur Dave] [HOTFIX] [SPARK-3400] Revert 9b225ac "fix GraphX EdgeRDD zipPartitions"
  • Loading branch information
ankurdave committed Sep 4, 2014
1 parent 1bed0a3 commit 00362da
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 18 deletions.
4 changes: 2 additions & 2 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.graphx

import scala.reflect.{classTag, ClassTag}

import org.apache.spark._
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
override val partitioner =
partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size)))
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
Expand Down
16 changes: 0 additions & 16 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.graphx

import org.scalatest.FunSuite

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
Expand Down Expand Up @@ -351,19 +350,4 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}

test("non-default number of edge partitions") {
val n = 10
val defaultParallelism = 3
val numEdgePartitions = 4
assert(defaultParallelism != numEdgePartitions)
val conf = new SparkConf()
.set("spark.default.parallelism", defaultParallelism.toString)
val sc = new SparkContext("local", "test", conf)
val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
numEdgePartitions)
val graph = Graph.fromEdgeTuples(edges, 1)
val neighborAttrSums = graph.mapReduceTriplets[Int](
et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
}
}

0 comments on commit 00362da

Please sign in to comment.