From 5a5c52a575bceb1286c38146f81f81c244bb3beb Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 8 Apr 2014 23:30:53 -0700 Subject: [PATCH] SPARK-1329: Create pid2vid with correct number of partitions Each vertex partition is co-located with a pid2vid array created in RoutingTable.scala. This array maps edge partition IDs to the list of vertices in the current vertex partition that are mentioned by edges in that partition. Therefore the pid2vid array should have one entry per edge partition. GraphX currently creates one entry per vertex partition, which is a bug that leads to an ArrayIndexOutOfBoundsException when there are more edge partitions than vertex partitions. This commit fixes the bug and adds a test for this case. Resolves SPARK-1329. Thanks to Daniel Darabos for reporting this bug. --- .../org/apache/spark/graphx/impl/RoutingTable.scala | 4 ++-- .../scala/org/apache/spark/graphx/GraphSuite.scala | 12 ++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala index fe44e1ee0c391..022d5668e2942 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala @@ -69,9 +69,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { vSet.iterator.map { vid => (vid, pid) } } - val numPartitions = vertices.partitions.size + val numEdgePartitions = edges.partitions.size vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter => - val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId]) + val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId]) for ((vid, pid) <- iter) { pid2vid(pid) += vid } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 28d34dd9a1a41..8e944c5a7f34e 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -287,4 +287,16 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("more edge partitions than vertex partitions") { + withSpark { sc => + val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1) + val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2) + val graph = Graph(verts, edges) + val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)) + .collect.toSet + assert(triplets === + Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a"))) + } + } + }