Skip to content

Commit

Permalink
SPARK-1329: Create pid2vid with correct number of partitions
Browse files Browse the repository at this point in the history
Each vertex partition is co-located with a pid2vid array created in
RoutingTable.scala. This array maps edge partition IDs to the list of
vertices in the current vertex partition that are mentioned by edges in
that partition. Therefore the pid2vid array should have one entry per
edge partition.

GraphX currently creates one entry per vertex partition, which is a bug
that leads to an ArrayIndexOutOfBoundsException when there are more edge
partitions than vertex partitions. This commit fixes the bug and adds a
test for this case.

Resolves SPARK-1329. Thanks to Daniel Darabos for reporting this bug.
  • Loading branch information
ankurdave committed Apr 9, 2014
1 parent b9e0c93 commit 5a5c52a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
vSet.iterator.map { vid => (vid, pid) }
}

val numPartitions = vertices.partitions.size
val numEdgePartitions = edges.partitions.size
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId])
val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
for ((vid, pid) <- iter) {
pid2vid(pid) += vid
}
Expand Down
12 changes: 12 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}

test("more edge partitions than vertex partitions") {
withSpark { sc =>
val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1)
val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
val graph = Graph(verts, edges)
val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
.collect.toSet
assert(triplets ===
Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a")))
}
}

}

0 comments on commit 5a5c52a

Please sign in to comment.