Skip to content

Commit

Permalink
Improving documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
jegonzal committed Oct 30, 2014
1 parent e84d33e commit 7991f4b
Showing 1 changed file with 123 additions and 130 deletions.
253 changes: 123 additions & 130 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,58 @@ import org.apache.spark.Logging


/**
* The Pregel Vertex class contains the vertex attribute and the boolean flag indicating whether
* the vertex is active.
* The Pregel Vertex class contains the vertex attribute and the boolean flag
* indicating whether the vertex is active.
*
* @param attr
* @param isActive
* @tparam T
* @param attr the vertex prorperty during the pregel computation (e.g.,
* PageRank)
* @param isActive a flag indicating whether the vertex is active
* @tparam T the type of the vertex property
*/
sealed case class PregelVertex[T] (attr: T, isActive: Boolean = true)
extends Product2[T, Boolean] {
sealed case class PregelVertex[@specialized T]
(attr: T, isActive: Boolean = true) extends Product2[T, Boolean] {
override def _1: T = attr
override def _2: Boolean = isActive
}


/**
* Implements a Pregel-like bulk-synchronous message-passing API.
* The Pregel API enables users to express iterative graph algorithms in GraphX
* and is loosely based on the Google and GraphLab APIs.
*
* Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over
* edges, enables the message sending computation to read both vertex attributes, and constrains
* messages to the graph structure. These changes allow for substantially more efficient
* distributed execution while also exposing greater flexibility for graph-based computation.
* At a high-level iterative graph algorithms like PageRank recursively define
* vertex properties in terms of the properties of neighboring vertices. These
* recursive properties are then computed through iterative fixed-point
* computations. For example, the PageRank of a web-page can be defined as a
* weighted some of the PageRank of web-pages that link to that page and is
* computed by iteratively updating the PageRank of each page until a
* fixed-point is reached (the PageRank values stop changing).
*
* The GraphX Pregel API expresses iterative graph algorithms as vertex-programs
* which can send and receive messages from neighboring vertices. Vertex
* programs have the following logic:
*
* {{{
* while ( there are active vertices ) {
* for ( v in allVertices ) {
* // Send messages to neighbors
* if (isActive) {
* for (nbr in neighbors(v)) {
* val msgs: List[(id, msg)] = computeMsgs(Triplet(v, nbr))
* for ((id, msg) <- msgs) {
* messageSum(id) = reduce(messageSum(id), msg)
* }
* }
* }
* // Receive the "sum" of the messages to v and update the property
* (vertexProperty(v), isActive) =
* vertexProgram(vertexProperty(v), messagesSum(v))
* }
* }
* }}}
*
* The user defined `vertexProgram`, `computeMessage`, and `reduce` functions
* capture the core logic of graph algorithms.
*
* @example We can use the Pregel abstraction to implement PageRank:
* {{{
Expand All @@ -56,19 +87,90 @@ sealed case class PregelVertex[T] (attr: T, isActive: Boolean = true)
* // Set the vertex attributes to the initial pagerank values
* .mapVertices((id, attr) => 1.0)
*
* def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
* resetProb + (1.0 - resetProb) * msgSum
* def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
* Iterator((edge.dstId, edge.srcAttr * edge.attr))
* // Define the vertex program and message calculation functions.
* def vertexProgram(iter: Int, id: VertexId, oldV: PregelVertex[Double],
* msgSum: Option[Double]) = {
* PregelVertex(resetProb + (1.0 - resetProb) * msgSum.getOrElse(0.0))
* }
*
* def computeMsgs(iter: Int, edge: EdgeTriplet[PregelVertex[Double], Double]) = {
* Iterator((edge.dstId, edge.srcAttr.attr * edge.attr))
* }
*
* def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0
* // Execute Pregel for a fixed number of iterations.
* Pregel(pagerankGraph, initialMessage, numIter)(
* vertexProgram, sendMessage, messageCombiner)
*
* // Run PageRank
* val prGraph = Pregel.run(pagerankGraph, numIter, activeDirection = EdgeDirection.Out)(
* vertexProgram, sendMessage, messageCombiner).cache()
*
* // Normalize the pagerank vector:
* val normalizer: Double = prGraph.vertices.map(x => x._2).reduce(_ + _)
*
* prGraph.mapVertices((id, pr) => pr / normalizer)
*
* }}}
*
*/
object Pregel extends Logging {
/**
* The new Pregel API.
*/
def run[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vertexProgram: (Int, VertexId, PregelVertex[VD], Option[A]) => PregelVertex[VD],
computeMsgs: (Int, EdgeTriplet[PregelVertex[VD], ED]) => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
// Initialize the graph with all vertices active
var currengGraph: Graph[PregelVertex[VD], ED] =
graph.mapVertices { (vid, vdata) => PregelVertex(vdata) }.cache()
// Determine the set of vertices that did not vote to halt
var activeVertices = currengGraph.vertices
var numActive = activeVertices.count()
var iteration = 0
while (numActive > 0 && iteration < maxIterations) {
// get a reference to the current graph to enable unprecistance.
val prevG = currengGraph

// Compute the messages for all the active vertices
val messages = currengGraph.mapReduceTriplets( t => computeMsgs(iteration, t), mergeMsg,
Some((activeVertices, activeDirection)))

// Receive the messages to the subset of active vertices
currengGraph = currengGraph.outerJoinVertices(messages){ (vid, pVertex, msgOpt) =>
// If the vertex voted to halt and received no message then we can skip the vertex program
if (!pVertex.isActive && msgOpt.isEmpty) {
pVertex
} else {
// The vertex program is either active or received a message (or both).
// A vertex program should vote to halt again even if it has previously voted to halt
vertexProgram(iteration, vid, pVertex, msgOpt)
}
}.cache()

// Recompute the active vertices (those that have not voted to halt)
activeVertices = currengGraph.vertices.filter(v => v._2._2)

// Force all computation!
numActive = activeVertices.count()

// Unpersist the RDDs hidden by newly-materialized RDDs
//prevG.unpersistVertices(blocking=false)
//prevG.edges.unpersist(blocking=false)

//println("Finished Iteration " + i)
// g.vertices.foreach(println(_))

logInfo("Pregel finished iteration " + iteration)
// count the iteration
iteration += 1
}
currengGraph.mapVertices((id, vdata) => vdata.attr)
} // end of apply


/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
Expand Down Expand Up @@ -124,7 +226,7 @@ object Pregel extends Logging {
* @return the resulting graph at the end of the computation
*
*/
@deprecated ("Switching to Pregel.run.", "1.1")
// @deprecated ("Switching to Pregel.run.", "1.1")
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
Expand Down Expand Up @@ -175,115 +277,6 @@ object Pregel extends Logging {
} // end of apply


/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages, or
* for `maxIterations` iterations.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration
*
* @param maxIterations the maximum number of iterations to run for
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run. The default is
* `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message
* in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where
* *both* vertices received a message.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def run[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vertexProgram: (Int, VertexId, PregelVertex[VD], Option[A]) => PregelVertex[VD],
sendMsg: (Int, EdgeTriplet[PregelVertex[VD], ED]) => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
// Initialize the graph with all vertices active
var currengGraph: Graph[PregelVertex[VD], ED] =
graph.mapVertices { (vid, vdata) => PregelVertex(vdata) }.cache()
// Determine the set of vertices that did not vote to halt
var activeVertices = currengGraph.vertices
var numActive = activeVertices.count()
var iteration = 0
while (numActive > 0 && iteration < maxIterations) {
// get a reference to the current graph to enable unprecistance.
val prevG = currengGraph

// Compute the messages for all the active vertices
val messages = currengGraph.mapReduceTriplets( t => sendMsg(iteration, t), mergeMsg,
Some((activeVertices, activeDirection)))

// Receive the messages to the subset of active vertices
currengGraph = currengGraph.outerJoinVertices(messages){ (vid, pVertex, msgOpt) =>
// If the vertex voted to halt and received no message then we can skip the vertex program
if (!pVertex.isActive && msgOpt.isEmpty) {
pVertex
} else {
// The vertex program is either active or received a message (or both).
// A vertex program should vote to halt again even if it has previously voted to halt
vertexProgram(iteration, vid, pVertex, msgOpt)
}
}.cache()

// Recompute the active vertices (those that have not voted to halt)
activeVertices = currengGraph.vertices.filter(v => v._2._2)

// Force all computation!
numActive = activeVertices.count()

// Unpersist the RDDs hidden by newly-materialized RDDs
//prevG.unpersistVertices(blocking=false)
//prevG.edges.unpersist(blocking=false)

//println("Finished Iteration " + i)
// g.vertices.foreach(println(_))

logInfo("Pregel finished iteration " + iteration)
// count the iteration
iteration += 1
}
currengGraph.mapVertices((id, vdata) => vdata.attr)
} // end of apply


} // end of class Pregel

0 comments on commit 7991f4b

Please sign in to comment.