diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 5e55620147df8..5d71f31359bcb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -22,12 +22,58 @@ import org.apache.spark.Logging /** - * Implements a Pregel-like bulk-synchronous message-passing API. + * The Pregel Vertex class contains the vertex attribute and the boolean flag + * indicating whether the vertex is active. * - * Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over - * edges, enables the message sending computation to read both vertex attributes, and constrains - * messages to the graph structure. These changes allow for substantially more efficient - * distributed execution while also exposing greater flexibility for graph-based computation. + * @param attr the vertex prorperty during the pregel computation (e.g., + * PageRank) + * @param isActive a flag indicating whether the vertex is active + * @tparam T the type of the vertex property + */ +sealed case class PregelVertex[@specialized T] + (attr: T, isActive: Boolean = true) extends Product2[T, Boolean] { + override def _1: T = attr + override def _2: Boolean = isActive +} + + +/** + * The Pregel API enables users to express iterative graph algorithms in GraphX + * and is loosely based on the Google and GraphLab APIs. + * + * At a high-level iterative graph algorithms like PageRank recursively define + * vertex properties in terms of the properties of neighboring vertices. These + * recursive properties are then computed through iterative fixed-point + * computations. For example, the PageRank of a web-page can be defined as a + * weighted sum of the PageRank of web-pages that link to that page and is + * computed by iteratively updating the PageRank of each page until a + * fixed-point is reached (the PageRank values stop changing). + * + * The GraphX Pregel API expresses iterative graph algorithms as vertex-programs + * which can send and receive messages from neighboring vertices. Vertex + * programs have the following logic: + * + * {{{ + * while ( there are active vertices ) { + * for ( v in allVertices ) { + * // Send messages to neighbors + * if (isActive) { + * for (nbr in neighbors(v)) { + * val msgs: List[(id, msg)] = computeMsgs(Triplet(v, nbr)) + * for ((id, msg) <- msgs) { + * messageSum(id) = reduce(messageSum(id), msg) + * } + * } + * } + * // Receive the "sum" of the messages to v and update the property + * (vertexProperty(v), isActive) = + * vertexProgram(vertexProperty(v), messagesSum(v)) + * } + * } + * }}} + * + * The user defined `vertexProgram`, `computeMessage`, and `reduce` functions + * capture the core logic of graph algorithms. * * @example We can use the Pregel abstraction to implement PageRank: * {{{ @@ -41,19 +87,90 @@ import org.apache.spark.Logging * // Set the vertex attributes to the initial pagerank values * .mapVertices((id, attr) => 1.0) * - * def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double = - * resetProb + (1.0 - resetProb) * msgSum - * def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = - * Iterator((edge.dstId, edge.srcAttr * edge.attr)) + * // Define the vertex program and message calculation functions. + * def vertexProgram(iter: Int, id: VertexId, oldV: PregelVertex[Double], + * msgSum: Option[Double]) = { + * PregelVertex(resetProb + (1.0 - resetProb) * msgSum.getOrElse(0.0)) + * } + * + * def computeMsgs(iter: Int, edge: EdgeTriplet[PregelVertex[Double], Double]) = { + * Iterator((edge.dstId, edge.srcAttr.attr * edge.attr)) + * } + * * def messageCombiner(a: Double, b: Double): Double = a + b - * val initialMessage = 0.0 - * // Execute Pregel for a fixed number of iterations. - * Pregel(pagerankGraph, initialMessage, numIter)( - * vertexProgram, sendMessage, messageCombiner) + * + * // Run PageRank + * val prGraph = Pregel.run(pagerankGraph, numIter, activeDirection = EdgeDirection.Out)( + * vertexProgram, sendMessage, messageCombiner).cache() + * + * // Normalize the pagerank vector: + * val normalizer: Double = prGraph.vertices.map(x => x._2).reduce(_ + _) + * + * prGraph.mapVertices((id, pr) => pr / normalizer) + * * }}} * */ object Pregel extends Logging { + /** + * The new Pregel API. + */ + def run[VD: ClassTag, ED: ClassTag, A: ClassTag] + (graph: Graph[VD, ED], + maxIterations: Int = Int.MaxValue, + activeDirection: EdgeDirection = EdgeDirection.Either) + (vertexProgram: (Int, VertexId, PregelVertex[VD], Option[A]) => PregelVertex[VD], + computeMsgs: (Int, EdgeTriplet[PregelVertex[VD], ED]) => Iterator[(VertexId, A)], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = + { + // Initialize the graph with all vertices active + var currengGraph: Graph[PregelVertex[VD], ED] = + graph.mapVertices { (vid, vdata) => PregelVertex(vdata) }.cache() + // Determine the set of vertices that did not vote to halt + var activeVertices = currengGraph.vertices + var numActive = activeVertices.count() + var iteration = 0 + while (numActive > 0 && iteration < maxIterations) { + // get a reference to the current graph to enable unprecistance. + val prevG = currengGraph + + // Compute the messages for all the active vertices + val messages = currengGraph.mapReduceTriplets( t => computeMsgs(iteration, t), mergeMsg, + Some((activeVertices, activeDirection))) + + // Receive the messages to the subset of active vertices + currengGraph = currengGraph.outerJoinVertices(messages){ (vid, pVertex, msgOpt) => + // If the vertex voted to halt and received no message then we can skip the vertex program + if (!pVertex.isActive && msgOpt.isEmpty) { + pVertex + } else { + // The vertex program is either active or received a message (or both). + // A vertex program should vote to halt again even if it has previously voted to halt + vertexProgram(iteration, vid, pVertex, msgOpt) + } + }.cache() + + // Recompute the active vertices (those that have not voted to halt) + activeVertices = currengGraph.vertices.filter(v => v._2._2) + + // Force all computation! + numActive = activeVertices.count() + + // Unpersist the RDDs hidden by newly-materialized RDDs + //prevG.unpersistVertices(blocking=false) + //prevG.edges.unpersist(blocking=false) + + //println("Finished Iteration " + i) + // g.vertices.foreach(println(_)) + + logInfo("Pregel finished iteration " + iteration) + // count the iteration + iteration += 1 + } + currengGraph.mapVertices((id, vdata) => vdata.attr) + } // end of apply + /** * Execute a Pregel-like iterative vertex-parallel abstraction. The @@ -109,6 +226,7 @@ object Pregel extends Logging { * @return the resulting graph at the end of the computation * */ + // @deprecated ("Switching to Pregel.run.", "1.1") def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, @@ -158,4 +276,7 @@ object Pregel extends Logging { g } // end of apply + + + } // end of class Pregel diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 257e2f3a36115..0fa5ddfe6aadd 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.Logging import org.apache.spark.graphx._ +import org.apache.spark.graphx.Pregel._ /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. @@ -144,34 +145,39 @@ object PageRank extends Logging { } // Set the weight on the edges based on the degree .mapTriplets( e => 1.0 / e.srcAttr ) - // Set the vertex attributes to (initalPR, delta = 0) - .mapVertices( (id, attr) => (0.0, 0.0) ) + // Set the vertex attributes to (currentPr, deltaToSend) + .mapVertices( (id, attr) => (resetProb, (1.0 - resetProb) * resetProb) ) .cache() // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = { - val (oldPR, lastDelta) = attr - val newPR = oldPR + (1.0 - resetProb) * msgSum - (newPR, newPR - oldPR) + def vertexProgram(iter: Int, id: VertexId, vertex: PregelVertex[(Double, Double)], + msgSum: Option[Double]) = { + var (oldPR, pendingDelta) = vertex.attr + val newPR = oldPR + msgSum.getOrElse(0.0) + // if we were active then we sent the pending delta on the last iteration + if (vertex.isActive) { + pendingDelta = 0.0 + } + pendingDelta += (1.0 - resetProb) * msgSum.getOrElse(0.0) + val isActive = math.abs(pendingDelta) >= tol + PregelVertex((newPR, pendingDelta), isActive) } - def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { - if (edge.srcAttr._2 > tol) { - Iterator((edge.dstId, edge.srcAttr._2 * edge.attr)) - } else { - Iterator.empty - } + def sendMessage(iter: Int, edge: EdgeTriplet[PregelVertex[(Double, Double)], Double]) = { + val PregelVertex((srcPr, srcDelta), srcIsActive) = edge.srcAttr + assert(srcIsActive) + Iterator((edge.dstId, srcDelta * edge.attr)) } def messageCombiner(a: Double, b: Double): Double = a + b - // The initial message received by all vertices in PageRank - val initialMessage = resetProb / (1.0 - resetProb) - // Execute a dynamic version of Pregel. - Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)( + val prGraph = Pregel.run(pagerankGraph, activeDirection = EdgeDirection.Out)( vertexProgram, sendMessage, messageCombiner) .mapVertices((vid, attr) => attr._1) + .cache() + val normalizer: Double = prGraph.vertices.map(x => x._2).reduce(_ + _) + prGraph.mapVertices((id, pr) => pr / normalizer) } // end of deltaPageRank } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index 8a13c74221546..1e4f15e171dc4 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -265,4 +265,19 @@ object GraphGenerators { Graph.fromEdgeTuples(edges, 1) } // end of starGraph + + /** + * Create a cycle graph. + * + * @param sc the spark context in which to construct the graph + * @param nverts the number of vertices in the cycle + * + * @return A cycle graph containing `nverts` vertices with vertex 0 + * being the center vertex. + */ + def cycleGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { + val edges: RDD[(VertexId, VertexId)] = sc.parallelize(0 until nverts).map(vid => (vid, (vid + 1) % nverts)) + Graph.fromEdgeTuples(edges, 1) + } // end of starGraph + } // end of Graph Generators diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index fc491ae327c2a..aadc53fe54635 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -17,8 +17,9 @@ package org.apache.spark.graphx.lib -import org.scalatest.FunSuite +import org.scalatest.{Matchers, FunSuite} +import collection.mutable.MutableList import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.graphx._ @@ -26,9 +27,12 @@ import org.apache.spark.graphx.lib._ import org.apache.spark.graphx.util.GraphGenerators import org.apache.spark.rdd._ -object GridPageRank { - def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { - val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) +import scala.collection.mutable + +object TruePageRank { + + def grid(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { + val inNbrs = Array.fill(nRows * nCols)(mutable.MutableList.empty[Int]) val outDegree = Array.fill(nRows * nCols)(0) // Convert row column address into vertex ids (row major order) def sub2ind(r: Int, c: Int): Int = r * nCols + c @@ -44,93 +48,192 @@ object GridPageRank { inNbrs(sub2ind(r,c+1)) += ind } } + val vids = 0 until (nRows * nCols) + pr(vids.zip(inNbrs).toMap, vids.zip(outDegree).toMap, nIter, resetProb) + } + + def edges(edges: Array[(Int, Int)], nIter: Int, resetProb: Double) = { + val vids = edges.flatMap { case (s,d) => Iterator(s,d) }.toSet + val inNbrs = vids.map(id => (id, mutable.MutableList.empty[Int])).toMap + val outDegree = mutable.Map.empty[Int, Int] + for ((s,d) <- edges) { + outDegree(s) = outDegree.getOrElse(s,0) + 1 + inNbrs(d) += s + } + pr(inNbrs, outDegree, nIter, resetProb) + } + + def pr(inNbrs: collection.Map[Int, mutable.MutableList[Int]], + outDegree: collection.Map[Int, Int], nIter: Int, resetProb: Double) = { + val nVerts = inNbrs.size // compute the pagerank - var pr = Array.fill(nRows * nCols)(resetProb) + var pr = Array.fill(nVerts)(resetProb) for (iter <- 0 until nIter) { val oldPr = pr - pr = new Array[Double](nRows * nCols) - for (ind <- 0 until (nRows * nCols)) { + pr = new Array[Double](nVerts) + for (ind <- 0 until nVerts) { pr(ind) = resetProb + (1.0 - resetProb) * inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum } } - (0L until (nRows * nCols)).zip(pr) + val normalizer = pr.sum + (0L until nVerts).zip(pr.map(p => p / normalizer)) } } -class PageRankSuite extends FunSuite with LocalSparkContext { +class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } .map { case (id, error) => error }.sum } - test("Star PageRank") { + + test("Simple Chain Static PageRank") { withSpark { sc => - val nVertices = 100 - val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val chainEdges = Array((0, 1), (1, 2), (2, 3), (4, 3)) + val rawEdges = sc.parallelize(chainEdges, 1).map { case (s, d) => (s.toLong, d.toLong)} + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() val resetProb = 0.15 - val errorTol = 1.0e-5 + val tol = 0.0001 + val numIter = 10 + val truePr = TruePageRank.edges(chainEdges, numIter, resetProb).toMap + val vertices = chain.staticPageRank(numIter, resetProb).vertices.collect + val pageranks = vertices.toMap + for (i <- 0 until 4) { + pageranks(i) should equal(truePr(i) +- 1.0e-7) + } + } + } - val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices - val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache() - // Static PageRank should only take 2 iterations to converge - val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => - if (pr1 != pr2) 1 else 0 - }.map { case (vid, test) => test }.sum - assert(notMatching === 0) + test("Simple Chain Dynamic PageRank") { + withSpark { sc => + val chainEdges = Array((0, 1), (1, 2), (2, 3), (4, 3)) + val rawEdges = sc.parallelize(chainEdges, 1).map { case (s, d) => (s.toLong, d.toLong)} + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 10 + val truePr = TruePageRank.edges(chainEdges, numIter, resetProb).toMap + val vertices = chain.pageRank(1.0e-10, resetProb).vertices.collect + val pageranks = vertices.toMap + for (i <- 0 until 4) { + pageranks(i) should equal(truePr(i) +- 1.0e-7) + } + } + } + - val staticErrors = staticRanks2.map { case (vid, pr) => - val correct = (vid > 0 && pr == resetProb) || - (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) - if (!correct) 1 else 0 + test("Static Star PageRank") { + withSpark { sc => + val nVertices = 10 + val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val resetProb = 0.15 + val staticRanks: VertexRDD[Double] = starGraph.staticPageRank(numIter = 2, resetProb).vertices + // Check the static pagerank + val pageranks: Map[VertexId, Double] = staticRanks.collect().toMap + val perimeter = (nVertices - 1.0) * resetProb + val center = resetProb + (1.0 - resetProb) * perimeter + val normalizer = perimeter + center + pageranks(0) should equal((center / normalizer) +- 1.0e-7) + for (i <- 1 until nVertices) { + pageranks(i) should equal((resetProb / normalizer) +- 1.0e-7) } - assert(staticErrors.sum === 0) + } + } - val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache() - assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) + + test("Dynamic Star PageRank") { + withSpark { sc => + val nVertices = 10 + val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val resetProb = 0.15 + val dynamicRanks: VertexRDD[Double] = starGraph.pageRank(1.0e-10, resetProb).vertices + // Check the pagerank values + val pageranks: Map[VertexId, Double] = dynamicRanks.collect().toMap + val perimeter = (nVertices - 1.0) * resetProb + val center = resetProb + (1.0 - resetProb) * perimeter + val normalizer = perimeter + center + pageranks(0) should equal ((center / normalizer) +- 1.0e-7) + for(i <- 1 until nVertices) { + pageranks(i) should equal ((resetProb / normalizer) +- 1.0e-7) + } } } // end of test Star PageRank + test("Static Cycle PageRank") { + withSpark { sc => + val nVertices = 10 + val starGraph = GraphGenerators.cycleGraph(sc, nVertices) + val resetProb = 0.15 + val staticRanks: VertexRDD[Double] = + starGraph.staticPageRank(numIter = 10, resetProb).vertices + // Check the static pagerank + val pageranks: Map[VertexId, Double] = staticRanks.collect().toMap + for (i <- 0 until nVertices) { + pageranks(i) should equal ( (1.0/nVertices) +- 1.0e-7) + } + } + } - test("Grid PageRank") { + + test("Dynamic Cycle PageRank") { withSpark { sc => - val rows = 10 - val cols = 10 + val nVertices = 3 + val starGraph = GraphGenerators.cycleGraph(sc, nVertices) + val resetProb = 0.15 + val staticRanks: VertexRDD[Double] = starGraph.pageRank(1.0e-3, resetProb).vertices + // Check the static pagerank + val pageranks: Map[VertexId, Double] = staticRanks.collect().toMap + println(pageranks) + for (i <- 0 until nVertices) { + pageranks(i) should equal ( (1.0/nVertices) +- 1.0e-7) + } + } + } + + + test("Grid Static PageRank") { + withSpark { sc => + val rows = 5 + val cols = 5 val resetProb = 0.15 val tol = 0.0001 - val numIter = 50 + val numIter = 20 val errorTol = 1.0e-5 + val truePr = TruePageRank.grid(rows, cols, numIter, resetProb).toMap val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() - - val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache() - val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache() - val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache() - - assert(compareRanks(staticRanks, referenceRanks) < errorTol) - assert(compareRanks(dynamicRanks, referenceRanks) < errorTol) + val vertices = gridGraph.staticPageRank(numIter, resetProb).vertices.cache() + val pageranks: Map[VertexId, Double] = vertices.collect().toMap + for ((k,pr) <- truePr) { + pageranks(k) should equal ( pr +- 1.0e-5) + } } } // end of Grid PageRank - test("Chain PageRank") { + test("Grid Dynamic PageRank") { withSpark { sc => - val chain1 = (0 until 9).map(x => (x, x+1) ) - val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } - val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val rows = 5 + val cols = 5 val resetProb = 0.15 val tol = 0.0001 - val numIter = 10 + val numIter = 20 val errorTol = 1.0e-5 + val truePr = TruePageRank.grid(rows, cols, numIter, resetProb).toMap + val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() + val vertices = gridGraph.pageRank(1.0e-10, resetProb).vertices.cache() + val pageranks: Map[VertexId, Double] = vertices.collect().toMap + // vertices.collect.foreach(println(_)) + for ((k,pr) <- truePr) { + pageranks(k) should equal ( pr +- 1.0e-5) + } + } + } // end of Grid PageRank - val staticRanks = chain.staticPageRank(numIter, resetProb).vertices - val dynamicRanks = chain.pageRank(tol, resetProb).vertices - assert(compareRanks(staticRanks, dynamicRanks) < errorTol) - } - } }