From 7991f4b146804555190285961f732af958d9ff25 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Mon, 7 Jul 2014 11:36:16 -0700 Subject: [PATCH] Improving documentation --- .../org/apache/spark/graphx/Pregel.scala | 253 +++++++++--------- 1 file changed, 123 insertions(+), 130 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index f6451b42510c0..f6b658bd30bc9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -22,27 +22,58 @@ import org.apache.spark.Logging /** - * The Pregel Vertex class contains the vertex attribute and the boolean flag indicating whether - * the vertex is active. + * The Pregel Vertex class contains the vertex attribute and the boolean flag + * indicating whether the vertex is active. * - * @param attr - * @param isActive - * @tparam T + * @param attr the vertex prorperty during the pregel computation (e.g., + * PageRank) + * @param isActive a flag indicating whether the vertex is active + * @tparam T the type of the vertex property */ -sealed case class PregelVertex[T] (attr: T, isActive: Boolean = true) - extends Product2[T, Boolean] { +sealed case class PregelVertex[@specialized T] + (attr: T, isActive: Boolean = true) extends Product2[T, Boolean] { override def _1: T = attr override def _2: Boolean = isActive } /** - * Implements a Pregel-like bulk-synchronous message-passing API. + * The Pregel API enables users to express iterative graph algorithms in GraphX + * and is loosely based on the Google and GraphLab APIs. * - * Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over - * edges, enables the message sending computation to read both vertex attributes, and constrains - * messages to the graph structure. These changes allow for substantially more efficient - * distributed execution while also exposing greater flexibility for graph-based computation. + * At a high-level iterative graph algorithms like PageRank recursively define + * vertex properties in terms of the properties of neighboring vertices. These + * recursive properties are then computed through iterative fixed-point + * computations. For example, the PageRank of a web-page can be defined as a + * weighted some of the PageRank of web-pages that link to that page and is + * computed by iteratively updating the PageRank of each page until a + * fixed-point is reached (the PageRank values stop changing). + * + * The GraphX Pregel API expresses iterative graph algorithms as vertex-programs + * which can send and receive messages from neighboring vertices. Vertex + * programs have the following logic: + * + * {{{ + * while ( there are active vertices ) { + * for ( v in allVertices ) { + * // Send messages to neighbors + * if (isActive) { + * for (nbr in neighbors(v)) { + * val msgs: List[(id, msg)] = computeMsgs(Triplet(v, nbr)) + * for ((id, msg) <- msgs) { + * messageSum(id) = reduce(messageSum(id), msg) + * } + * } + * } + * // Receive the "sum" of the messages to v and update the property + * (vertexProperty(v), isActive) = + * vertexProgram(vertexProperty(v), messagesSum(v)) + * } + * } + * }}} + * + * The user defined `vertexProgram`, `computeMessage`, and `reduce` functions + * capture the core logic of graph algorithms. * * @example We can use the Pregel abstraction to implement PageRank: * {{{ @@ -56,19 +87,90 @@ sealed case class PregelVertex[T] (attr: T, isActive: Boolean = true) * // Set the vertex attributes to the initial pagerank values * .mapVertices((id, attr) => 1.0) * - * def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double = - * resetProb + (1.0 - resetProb) * msgSum - * def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = - * Iterator((edge.dstId, edge.srcAttr * edge.attr)) + * // Define the vertex program and message calculation functions. + * def vertexProgram(iter: Int, id: VertexId, oldV: PregelVertex[Double], + * msgSum: Option[Double]) = { + * PregelVertex(resetProb + (1.0 - resetProb) * msgSum.getOrElse(0.0)) + * } + * + * def computeMsgs(iter: Int, edge: EdgeTriplet[PregelVertex[Double], Double]) = { + * Iterator((edge.dstId, edge.srcAttr.attr * edge.attr)) + * } + * * def messageCombiner(a: Double, b: Double): Double = a + b - * val initialMessage = 0.0 - * // Execute Pregel for a fixed number of iterations. - * Pregel(pagerankGraph, initialMessage, numIter)( - * vertexProgram, sendMessage, messageCombiner) + * + * // Run PageRank + * val prGraph = Pregel.run(pagerankGraph, numIter, activeDirection = EdgeDirection.Out)( + * vertexProgram, sendMessage, messageCombiner).cache() + * + * // Normalize the pagerank vector: + * val normalizer: Double = prGraph.vertices.map(x => x._2).reduce(_ + _) + * + * prGraph.mapVertices((id, pr) => pr / normalizer) + * * }}} * */ object Pregel extends Logging { + /** + * The new Pregel API. + */ + def run[VD: ClassTag, ED: ClassTag, A: ClassTag] + (graph: Graph[VD, ED], + maxIterations: Int = Int.MaxValue, + activeDirection: EdgeDirection = EdgeDirection.Either) + (vertexProgram: (Int, VertexId, PregelVertex[VD], Option[A]) => PregelVertex[VD], + computeMsgs: (Int, EdgeTriplet[PregelVertex[VD], ED]) => Iterator[(VertexId, A)], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = + { + // Initialize the graph with all vertices active + var currengGraph: Graph[PregelVertex[VD], ED] = + graph.mapVertices { (vid, vdata) => PregelVertex(vdata) }.cache() + // Determine the set of vertices that did not vote to halt + var activeVertices = currengGraph.vertices + var numActive = activeVertices.count() + var iteration = 0 + while (numActive > 0 && iteration < maxIterations) { + // get a reference to the current graph to enable unprecistance. + val prevG = currengGraph + + // Compute the messages for all the active vertices + val messages = currengGraph.mapReduceTriplets( t => computeMsgs(iteration, t), mergeMsg, + Some((activeVertices, activeDirection))) + + // Receive the messages to the subset of active vertices + currengGraph = currengGraph.outerJoinVertices(messages){ (vid, pVertex, msgOpt) => + // If the vertex voted to halt and received no message then we can skip the vertex program + if (!pVertex.isActive && msgOpt.isEmpty) { + pVertex + } else { + // The vertex program is either active or received a message (or both). + // A vertex program should vote to halt again even if it has previously voted to halt + vertexProgram(iteration, vid, pVertex, msgOpt) + } + }.cache() + + // Recompute the active vertices (those that have not voted to halt) + activeVertices = currengGraph.vertices.filter(v => v._2._2) + + // Force all computation! + numActive = activeVertices.count() + + // Unpersist the RDDs hidden by newly-materialized RDDs + //prevG.unpersistVertices(blocking=false) + //prevG.edges.unpersist(blocking=false) + + //println("Finished Iteration " + i) + // g.vertices.foreach(println(_)) + + logInfo("Pregel finished iteration " + iteration) + // count the iteration + iteration += 1 + } + currengGraph.mapVertices((id, vdata) => vdata.attr) + } // end of apply + /** * Execute a Pregel-like iterative vertex-parallel abstraction. The @@ -124,7 +226,7 @@ object Pregel extends Logging { * @return the resulting graph at the end of the computation * */ - @deprecated ("Switching to Pregel.run.", "1.1") + // @deprecated ("Switching to Pregel.run.", "1.1") def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, @@ -175,115 +277,6 @@ object Pregel extends Logging { } // end of apply - /** - * Execute a Pregel-like iterative vertex-parallel abstraction. The - * user-defined vertex-program `vprog` is executed in parallel on - * each vertex receiving any inbound messages and computing a new - * value for the vertex. The `sendMsg` function is then invoked on - * all out-edges and is used to compute an optional message to the - * destination vertex. The `mergeMsg` function is a commutative - * associative function used to combine messages destined to the - * same vertex. - * - * On the first iteration all vertices receive the `initialMsg` and - * on subsequent iterations if a vertex does not receive a message - * then the vertex-program is not invoked. - * - * This function iterates until there are no remaining messages, or - * for `maxIterations` iterations. - * - * @tparam VD the vertex data type - * @tparam ED the edge data type - * @tparam A the Pregel message type - * - * @param graph the input graph. - * - * @param initialMsg the message each vertex will receive at the on - * the first iteration - * - * @param maxIterations the maximum number of iterations to run for - * - * @param activeDirection the direction of edges incident to a vertex that received a message in - * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only - * out-edges of vertices that received a message in the previous round will run. The default is - * `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message - * in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where - * *both* vertices received a message. - * - * @param vprog the user-defined vertex program which runs on each - * vertex and receives the inbound message and computes a new vertex - * value. On the first iteration the vertex program is invoked on - * all vertices and is passed the default message. On subsequent - * iterations the vertex program is only invoked on those vertices - * that receive messages. - * - * @param sendMsg a user supplied function that is applied to out - * edges of vertices that received messages in the current - * iteration - * - * @param mergeMsg a user supplied function that takes two incoming - * messages of type A and merges them into a single message of type - * A. ''This function must be commutative and associative and - * ideally the size of A should not increase.'' - * - * @return the resulting graph at the end of the computation - * - */ - def run[VD: ClassTag, ED: ClassTag, A: ClassTag] - (graph: Graph[VD, ED], - maxIterations: Int = Int.MaxValue, - activeDirection: EdgeDirection = EdgeDirection.Either) - (vertexProgram: (Int, VertexId, PregelVertex[VD], Option[A]) => PregelVertex[VD], - sendMsg: (Int, EdgeTriplet[PregelVertex[VD], ED]) => Iterator[(VertexId, A)], - mergeMsg: (A, A) => A) - : Graph[VD, ED] = - { - // Initialize the graph with all vertices active - var currengGraph: Graph[PregelVertex[VD], ED] = - graph.mapVertices { (vid, vdata) => PregelVertex(vdata) }.cache() - // Determine the set of vertices that did not vote to halt - var activeVertices = currengGraph.vertices - var numActive = activeVertices.count() - var iteration = 0 - while (numActive > 0 && iteration < maxIterations) { - // get a reference to the current graph to enable unprecistance. - val prevG = currengGraph - - // Compute the messages for all the active vertices - val messages = currengGraph.mapReduceTriplets( t => sendMsg(iteration, t), mergeMsg, - Some((activeVertices, activeDirection))) - - // Receive the messages to the subset of active vertices - currengGraph = currengGraph.outerJoinVertices(messages){ (vid, pVertex, msgOpt) => - // If the vertex voted to halt and received no message then we can skip the vertex program - if (!pVertex.isActive && msgOpt.isEmpty) { - pVertex - } else { - // The vertex program is either active or received a message (or both). - // A vertex program should vote to halt again even if it has previously voted to halt - vertexProgram(iteration, vid, pVertex, msgOpt) - } - }.cache() - - // Recompute the active vertices (those that have not voted to halt) - activeVertices = currengGraph.vertices.filter(v => v._2._2) - - // Force all computation! - numActive = activeVertices.count() - - // Unpersist the RDDs hidden by newly-materialized RDDs - //prevG.unpersistVertices(blocking=false) - //prevG.edges.unpersist(blocking=false) - - //println("Finished Iteration " + i) - // g.vertices.foreach(println(_)) - - logInfo("Pregel finished iteration " + iteration) - // count the iteration - iteration += 1 - } - currengGraph.mapVertices((id, vdata) => vdata.attr) - } // end of apply } // end of class Pregel