From 7496d6b7181ef9eecfdee5c84f154696302821df Mon Sep 17 00:00:00 2001 From: Andres Perez Date: Mon, 17 Feb 2014 13:32:09 -0500 Subject: [PATCH 1/3] Add Shortest-path computations to graphx.lib with unit tests. --- .../spark/graphx/lib/ShortestPaths.scala | 73 +++++++++++++++++++ .../spark/graphx/lib/ShortestPathsSuite.scala | 44 +++++++++++ project/SparkBuild.scala | 5 +- 3 files changed, 120 insertions(+), 2 deletions(-) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala new file mode 100644 index 0000000000000..ae605ae970b42 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.apache.spark.graphx._ +import com.twitter.algebird.{ Min, Monoid } + +object ShortestPaths { + type SPMap = Map[VertexId, Min[Int]] // map of landmarks -> minimum distance to landmark + def SPMap(x: (VertexId, Min[Int])*) = Map(x: _*) + def increment(spmap: SPMap): SPMap = spmap.map { case (v, Min(d)) => v -> Min(d + 1) } + + /** + * Compute the shortest paths to each landmark for each vertex and + * return an RDD with the map of landmarks to their shortest-path + * lengths. + * + * @tparam VD the shortest paths map for the vertex + * @tparam ED the incremented shortest-paths map of the originating + * vertex (discarded in the computation) + * + * @param graph the graph for which to compute the shortest paths + * @param landmarks the list of landmark vertex ids + * + * @return a graph with vertex attributes containing a map of the + * shortest paths to each landmark + */ + def run[VD, ED](graph: Graph[VD, ED], landmarks: Seq[VertexId]) + (implicit m1: Manifest[VD], m2: Manifest[ED], spMapMonoid: Monoid[SPMap]): Graph[SPMap, SPMap] = { + + val spGraph = graph + .mapVertices{ (vid, attr) => + if (landmarks.contains(vid)) SPMap(vid -> Min(0)) + else SPMap() + } + .mapTriplets{ edge => edge.srcAttr } + + val initialMessage = SPMap() + + def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { + spMapMonoid.plus(attr, msg) + } + + def sendMessage(edge: EdgeTriplet[SPMap, SPMap]): Iterator[(VertexId, SPMap)] = { + val newAttr = increment(edge.srcAttr) + if (edge.dstAttr != spMapMonoid.plus(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr)) + else Iterator.empty + } + + def messageCombiner(s1: SPMap, s2: SPMap): SPMap = { + spMapMonoid.plus(s1, s2) + } + + Pregel(spGraph, initialMessage)( + vertexProgram, sendMessage, messageCombiner) + } + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala new file mode 100644 index 0000000000000..d095d3e791b5b --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.lib._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + +class ShortestPathsSuite extends FunSuite with LocalSparkContext { + + test("Shortest Path Computations") { + withSpark { sc => + val shortestPaths = Set((1,Map(1 -> 0, 4 -> 2)), (2,Map(1 -> 1, 4 -> 2)), (3,Map(1 -> 2, 4 -> 1)), + (4,Map(1 -> 2, 4 -> 0)), (5,Map(1 -> 1, 4 -> 1)), (6,Map(1 -> 3, 4 -> 1))) + val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap{ case e => Seq(e, e.swap) } + val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) } + val graph = Graph.fromEdgeTuples(edges, 1) + val landmarks = Seq(1, 4).map(_.toLong) + val results = ShortestPaths.run(graph, landmarks).vertices.collect.map { case (v, spMap) => (v, spMap.mapValues(_.get)) } + assert(results.toSet === shortestPaths) + } + } + +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 74bad66cfd018..f8a690af65f7d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -86,7 +86,7 @@ object SparkBuild extends Build { case None => DEFAULT_YARN case Some(v) => v.toBoolean } - lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client" + lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client" // Conditionally include the yarn sub-project lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core) @@ -322,7 +322,8 @@ object SparkBuild extends Build { def graphxSettings = sharedSettings ++ Seq( name := "spark-graphx", libraryDependencies ++= Seq( - "org.jblas" % "jblas" % "1.2.3" + "org.jblas" % "jblas" % "1.2.3", + "com.twitter" %% "algebird-core" % "0.3.0" ) ) From f8f6d918bb2da6636a9998ce56d42110b7011257 Mon Sep 17 00:00:00 2001 From: Andres Perez Date: Mon, 17 Feb 2014 14:29:42 -0500 Subject: [PATCH 2/3] Revert "Add Shortest-path computations to graphx.lib with unit tests." This reverts commit 7496d6b7181ef9eecfdee5c84f154696302821df. --- .../spark/graphx/lib/ShortestPaths.scala | 73 ------------------- .../spark/graphx/lib/ShortestPathsSuite.scala | 44 ----------- project/SparkBuild.scala | 5 +- 3 files changed, 2 insertions(+), 120 deletions(-) delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala delete mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala deleted file mode 100644 index ae605ae970b42..0000000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.graphx.lib - -import org.apache.spark.graphx._ -import com.twitter.algebird.{ Min, Monoid } - -object ShortestPaths { - type SPMap = Map[VertexId, Min[Int]] // map of landmarks -> minimum distance to landmark - def SPMap(x: (VertexId, Min[Int])*) = Map(x: _*) - def increment(spmap: SPMap): SPMap = spmap.map { case (v, Min(d)) => v -> Min(d + 1) } - - /** - * Compute the shortest paths to each landmark for each vertex and - * return an RDD with the map of landmarks to their shortest-path - * lengths. - * - * @tparam VD the shortest paths map for the vertex - * @tparam ED the incremented shortest-paths map of the originating - * vertex (discarded in the computation) - * - * @param graph the graph for which to compute the shortest paths - * @param landmarks the list of landmark vertex ids - * - * @return a graph with vertex attributes containing a map of the - * shortest paths to each landmark - */ - def run[VD, ED](graph: Graph[VD, ED], landmarks: Seq[VertexId]) - (implicit m1: Manifest[VD], m2: Manifest[ED], spMapMonoid: Monoid[SPMap]): Graph[SPMap, SPMap] = { - - val spGraph = graph - .mapVertices{ (vid, attr) => - if (landmarks.contains(vid)) SPMap(vid -> Min(0)) - else SPMap() - } - .mapTriplets{ edge => edge.srcAttr } - - val initialMessage = SPMap() - - def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { - spMapMonoid.plus(attr, msg) - } - - def sendMessage(edge: EdgeTriplet[SPMap, SPMap]): Iterator[(VertexId, SPMap)] = { - val newAttr = increment(edge.srcAttr) - if (edge.dstAttr != spMapMonoid.plus(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr)) - else Iterator.empty - } - - def messageCombiner(s1: SPMap, s2: SPMap): SPMap = { - spMapMonoid.plus(s1, s2) - } - - Pregel(spGraph, initialMessage)( - vertexProgram, sendMessage, messageCombiner) - } - -} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala deleted file mode 100644 index d095d3e791b5b..0000000000000 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.graphx.lib - -import org.scalatest.FunSuite - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.graphx._ -import org.apache.spark.graphx.lib._ -import org.apache.spark.graphx.util.GraphGenerators -import org.apache.spark.rdd._ - -class ShortestPathsSuite extends FunSuite with LocalSparkContext { - - test("Shortest Path Computations") { - withSpark { sc => - val shortestPaths = Set((1,Map(1 -> 0, 4 -> 2)), (2,Map(1 -> 1, 4 -> 2)), (3,Map(1 -> 2, 4 -> 1)), - (4,Map(1 -> 2, 4 -> 0)), (5,Map(1 -> 1, 4 -> 1)), (6,Map(1 -> 3, 4 -> 1))) - val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap{ case e => Seq(e, e.swap) } - val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) } - val graph = Graph.fromEdgeTuples(edges, 1) - val landmarks = Seq(1, 4).map(_.toLong) - val results = ShortestPaths.run(graph, landmarks).vertices.collect.map { case (v, spMap) => (v, spMap.mapValues(_.get)) } - assert(results.toSet === shortestPaths) - } - } - -} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f8a690af65f7d..74bad66cfd018 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -86,7 +86,7 @@ object SparkBuild extends Build { case None => DEFAULT_YARN case Some(v) => v.toBoolean } - lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client" + lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client" // Conditionally include the yarn sub-project lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core) @@ -322,8 +322,7 @@ object SparkBuild extends Build { def graphxSettings = sharedSettings ++ Seq( name := "spark-graphx", libraryDependencies ++= Seq( - "org.jblas" % "jblas" % "1.2.3", - "com.twitter" %% "algebird-core" % "0.3.0" + "org.jblas" % "jblas" % "1.2.3" ) ) From 47e22db50a017aae369d5b2801cc2a2226fd0d75 Mon Sep 17 00:00:00 2001 From: Andres Perez Date: Wed, 23 Apr 2014 13:09:39 -0400 Subject: [PATCH 3/3] Remove algebird dependency from ShortestPaths. --- .../spark/graphx/lib/ShortestPaths.scala | 21 +++++++++++-------- project/SparkBuild.scala | 3 +-- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala index ae605ae970b42..e5a4d82b9874c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala @@ -18,12 +18,15 @@ package org.apache.spark.graphx.lib import org.apache.spark.graphx._ -import com.twitter.algebird.{ Min, Monoid } object ShortestPaths { - type SPMap = Map[VertexId, Min[Int]] // map of landmarks -> minimum distance to landmark - def SPMap(x: (VertexId, Min[Int])*) = Map(x: _*) - def increment(spmap: SPMap): SPMap = spmap.map { case (v, Min(d)) => v -> Min(d + 1) } + type SPMap = Map[VertexId, Int] // map of landmarks -> minimum distance to landmark + def SPMap(x: (VertexId, Int)*) = Map(x: _*) + def increment(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) } + def plus(spmap1: SPMap, spmap2: SPMap): SPMap = + (spmap1.keySet ++ spmap2.keySet).map{ + k => k -> scala.math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)) + }.toMap /** * Compute the shortest paths to each landmark for each vertex and @@ -41,11 +44,11 @@ object ShortestPaths { * shortest paths to each landmark */ def run[VD, ED](graph: Graph[VD, ED], landmarks: Seq[VertexId]) - (implicit m1: Manifest[VD], m2: Manifest[ED], spMapMonoid: Monoid[SPMap]): Graph[SPMap, SPMap] = { + (implicit m1: Manifest[VD], m2: Manifest[ED]): Graph[SPMap, SPMap] = { val spGraph = graph .mapVertices{ (vid, attr) => - if (landmarks.contains(vid)) SPMap(vid -> Min(0)) + if (landmarks.contains(vid)) SPMap(vid -> 0) else SPMap() } .mapTriplets{ edge => edge.srcAttr } @@ -53,17 +56,17 @@ object ShortestPaths { val initialMessage = SPMap() def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { - spMapMonoid.plus(attr, msg) + plus(attr, msg) } def sendMessage(edge: EdgeTriplet[SPMap, SPMap]): Iterator[(VertexId, SPMap)] = { val newAttr = increment(edge.srcAttr) - if (edge.dstAttr != spMapMonoid.plus(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr)) + if (edge.dstAttr != plus(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr)) else Iterator.empty } def messageCombiner(s1: SPMap, s2: SPMap): SPMap = { - spMapMonoid.plus(s1, s2) + plus(s1, s2) } Pregel(spGraph, initialMessage)( diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f8a690af65f7d..507fd732db6fb 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -322,8 +322,7 @@ object SparkBuild extends Build { def graphxSettings = sharedSettings ++ Seq( name := "spark-graphx", libraryDependencies ++= Seq( - "org.jblas" % "jblas" % "1.2.3", - "com.twitter" %% "algebird-core" % "0.3.0" + "org.jblas" % "jblas" % "1.2.3" ) )