diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala index fe44e1ee0c391..022d5668e2942 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala @@ -69,9 +69,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { vSet.iterator.map { vid => (vid, pid) } } - val numPartitions = vertices.partitions.size + val numEdgePartitions = edges.partitions.size vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter => - val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId]) + val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId]) for ((vid, pid) <- iter) { pid2vid(pid) += vid } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index d9ba4672ce0c5..32b5fe4813594 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -297,4 +297,16 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("more edge partitions than vertex partitions") { + withSpark { sc => + val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1) + val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2) + val graph = Graph(verts, edges) + val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)) + .collect.toSet + assert(triplets === + Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a"))) + } + } + }