-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Shortest-path computations to graphx.lib with unit tests. #10
Changes from all commits
7496d6b
f8f6d91
a3bdb0e
2cbfe45
6cd90a5
2d5e788
ee9d90b
5c5b197
ba6e530
d47865f
9319fac
745a7a1
9ee0d89
25fbe10
4986f80
44d19e5
47e22db
c9d1ee8
88d80da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.graphx.lib | ||
|
||
import org.apache.spark.graphx._ | ||
|
||
object ShortestPaths { | ||
type SPMap = Map[VertexId, Int] // map of landmarks -> minimum distance to landmark | ||
def SPMap(x: (VertexId, Int)*) = Map(x: _*) | ||
def increment(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) } | ||
def plus(spmap1: SPMap, spmap2: SPMap): SPMap = | ||
(spmap1.keySet ++ spmap2.keySet).map{ | ||
k => k -> scala.math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)) | ||
}.toMap | ||
|
||
/** | ||
* Compute the shortest paths to each landmark for each vertex and | ||
* return an RDD with the map of landmarks to their shortest-path | ||
* lengths. | ||
* | ||
* @tparam VD the shortest paths map for the vertex | ||
* @tparam ED the incremented shortest-paths map of the originating | ||
* vertex (discarded in the computation) | ||
* | ||
* @param graph the graph for which to compute the shortest paths | ||
* @param landmarks the list of landmark vertex ids | ||
* | ||
* @return a graph with vertex attributes containing a map of the | ||
* shortest paths to each landmark | ||
*/ | ||
def run[VD, ED](graph: Graph[VD, ED], landmarks: Seq[VertexId]) | ||
(implicit m1: Manifest[VD], m2: Manifest[ED]): Graph[SPMap, SPMap] = { | ||
|
||
val spGraph = graph | ||
.mapVertices{ (vid, attr) => | ||
if (landmarks.contains(vid)) SPMap(vid -> 0) | ||
else SPMap() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we switch to an array implementation of the map then perhaps set the distance to MaxInt (or MaxDouble if we switch to weighted edge). |
||
} | ||
.mapTriplets{ edge => edge.srcAttr } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mapTriplets call do? It doesn't seem that we access edge.attr later on. |
||
|
||
val initialMessage = SPMap() | ||
|
||
def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { | ||
plus(attr, msg) | ||
} | ||
|
||
def sendMessage(edge: EdgeTriplet[SPMap, SPMap]): Iterator[(VertexId, SPMap)] = { | ||
val newAttr = increment(edge.srcAttr) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be worth considering adding support for edge weights instead of assuming all edges are length 1. |
||
if (edge.dstAttr != plus(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr)) | ||
else Iterator.empty | ||
} | ||
|
||
def messageCombiner(s1: SPMap, s2: SPMap): SPMap = { | ||
plus(s1, s2) | ||
} | ||
|
||
Pregel(spGraph, initialMessage)( | ||
vertexProgram, sendMessage, messageCombiner) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.graphx.lib | ||
|
||
import org.scalatest.FunSuite | ||
|
||
import org.apache.spark.SparkContext | ||
import org.apache.spark.SparkContext._ | ||
import org.apache.spark.graphx._ | ||
import org.apache.spark.graphx.lib._ | ||
import org.apache.spark.graphx.util.GraphGenerators | ||
import org.apache.spark.rdd._ | ||
|
||
class ShortestPathsSuite extends FunSuite with LocalSparkContext { | ||
|
||
test("Shortest Path Computations") { | ||
withSpark { sc => | ||
val shortestPaths = Set((1,Map(1 -> 0, 4 -> 2)), (2,Map(1 -> 1, 4 -> 2)), (3,Map(1 -> 2, 4 -> 1)), | ||
(4,Map(1 -> 2, 4 -> 0)), (5,Map(1 -> 1, 4 -> 1)), (6,Map(1 -> 3, 4 -> 1))) | ||
val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap{ case e => Seq(e, e.swap) } | ||
val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) } | ||
val graph = Graph.fromEdgeTuples(edges, 1) | ||
val landmarks = Seq(1, 4).map(_.toLong) | ||
val results = ShortestPaths.run(graph, landmarks).vertices.collect.map { case (v, spMap) => (v, spMap.mapValues(_.get)) } | ||
assert(results.toSet === shortestPaths) | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scala map data-structures can be pretty costly and inefficient. Instead you could use an array containing the distances and then maintain a global map (shared by broadcast variable) with the mapping from vertex id to index in the array. This should also reduce the memory overhead substantially since each vertex will not need to maintain its own locally Map data structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially remap the "landmarks" to a consecutive landmark id set and then on the initial creation of spGraph you would require using the single broadcast map but from then on no map data structures would be required.