Skip to content

Commit

Permalink
Adding PregelVertex type which encapsulates the vertex and its active…
Browse files Browse the repository at this point in the history
…ness.
  • Loading branch information
Joseph E. Gonzalez authored and jegonzal committed Oct 30, 2014
1 parent 8f41371 commit e84d33e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
35 changes: 24 additions & 11 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ import scala.reflect.ClassTag
import org.apache.spark.Logging


/**
* The Pregel Vertex class contains the vertex attribute and the boolean flag indicating whether
* the vertex is active.
*
* @param attr
* @param isActive
* @tparam T
*/
sealed case class PregelVertex[T] (attr: T, isActive: Boolean = true)
extends Product2[T, Boolean] {
override def _1: T = attr
override def _2: Boolean = isActive
}


/**
* Implements a Pregel-like bulk-synchronous message-passing API.
*
Expand Down Expand Up @@ -55,7 +70,6 @@ import org.apache.spark.Logging
*/
object Pregel extends Logging {


/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
Expand Down Expand Up @@ -219,14 +233,14 @@ object Pregel extends Logging {
(graph: Graph[VD, ED],
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vertexProgram: (Int, VertexId, VD, Boolean, Option[A]) => (VD, Boolean),
sendMsg: (Int, EdgeTriplet[(VD, Boolean), ED]) => Iterator[(VertexId, A)],
(vertexProgram: (Int, VertexId, PregelVertex[VD], Option[A]) => PregelVertex[VD],
sendMsg: (Int, EdgeTriplet[PregelVertex[VD], ED]) => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
// Initialize the graph with all vertices active
var currengGraph: Graph[(VD, Boolean), ED] =
graph.mapVertices { (vid, vdata) => (vdata, true) }.cache()
var currengGraph: Graph[PregelVertex[VD], ED] =
graph.mapVertices { (vid, vdata) => PregelVertex(vdata) }.cache()
// Determine the set of vertices that did not vote to halt
var activeVertices = currengGraph.vertices
var numActive = activeVertices.count()
Expand All @@ -240,15 +254,14 @@ object Pregel extends Logging {
Some((activeVertices, activeDirection)))

// Receive the messages to the subset of active vertices
currengGraph = currengGraph.outerJoinVertices(messages){ (vid, dataAndActive, msgOpt) =>
val (vdata, active) = dataAndActive
currengGraph = currengGraph.outerJoinVertices(messages){ (vid, pVertex, msgOpt) =>
// If the vertex voted to halt and received no message then we can skip the vertex program
if (!active && msgOpt.isEmpty) {
dataAndActive
if (!pVertex.isActive && msgOpt.isEmpty) {
pVertex
} else {
// The vertex program is either active or received a message (or both).
// A vertex program should vote to halt again even if it has previously voted to halt
vertexProgram(iteration, vid, vdata, active, msgOpt)
vertexProgram(iteration, vid, pVertex, msgOpt)
}
}.cache()

Expand All @@ -269,7 +282,7 @@ object Pregel extends Logging {
// count the iteration
iteration += 1
}
currengGraph.mapVertices((id, vdata) => vdata._1)
currengGraph.mapVertices((id, vdata) => vdata.attr)
} // end of apply


Expand Down
12 changes: 6 additions & 6 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,21 @@ object PageRank extends Logging {

// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(iter: Int, id: VertexId, attr: (Double, Double), wasActive: Boolean,
def vertexProgram(iter: Int, id: VertexId, vertex: PregelVertex[(Double, Double)],
msgSum: Option[Double]) = {
var (oldPR, pendingDelta) = attr
var (oldPR, pendingDelta) = vertex.attr
val newPR = oldPR + msgSum.getOrElse(0.0)
// if we were active then we sent the pending delta on the last iteration
if (wasActive) {
if (vertex.isActive) {
pendingDelta = 0.0
}
pendingDelta += (1.0 - resetProb) * msgSum.getOrElse(0.0)
val isActive = math.abs(pendingDelta) >= tol
((newPR, pendingDelta), isActive)
PregelVertex((newPR, pendingDelta), isActive)
}

def sendMessage(iter: Int, edge: EdgeTriplet[((Double, Double), Boolean), Double]) = {
val ((srcPr, srcDelta), srcIsActive) = edge.srcAttr
def sendMessage(iter: Int, edge: EdgeTriplet[PregelVertex[(Double, Double)], Double]) = {
val PregelVertex((srcPr, srcDelta), srcIsActive) = edge.srcAttr
assert(srcIsActive)
Iterator((edge.dstId, srcDelta * edge.attr))
}
Expand Down

0 comments on commit e84d33e

Please sign in to comment.