From 27720038c4aa4accb8815ca84bc0e87e5916c916 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 25 Jun 2014 15:24:54 -0700 Subject: [PATCH 01/12] Starting a revised version of Pregel --- .../org/apache/spark/graphx/Pregel.scala | 111 ++++++++++++++++++ .../apache/spark/graphx/PregelContext.scala | 24 ++++ 2 files changed, 135 insertions(+) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 5e55620147df8..96404110cad9c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -158,4 +158,115 @@ object Pregel extends Logging { g } // end of apply + /** + * Execute a Pregel-like iterative vertex-parallel abstraction. The + * user-defined vertex-program `vprog` is executed in parallel on + * each vertex receiving any inbound messages and computing a new + * value for the vertex. The `sendMsg` function is then invoked on + * all out-edges and is used to compute an optional message to the + * destination vertex. The `mergeMsg` function is a commutative + * associative function used to combine messages destined to the + * same vertex. + * + * On the first iteration all vertices receive the `initialMsg` and + * on subsequent iterations if a vertex does not receive a message + * then the vertex-program is not invoked. + * + * This function iterates until there are no remaining messages, or + * for `maxIterations` iterations. + * + * @tparam VD the vertex data type + * @tparam ED the edge data type + * @tparam A the Pregel message type + * + * @param graph the input graph. + * + * @param initialMsg the message each vertex will receive at the on + * the first iteration + * + * @param maxIterations the maximum number of iterations to run for + * + * @param activeDirection the direction of edges incident to a vertex that received a message in + * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only + * out-edges of vertices that received a message in the previous round will run. The default is + * `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message + * in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where + * *both* vertices received a message. + * + * @param vprog the user-defined vertex program which runs on each + * vertex and receives the inbound message and computes a new vertex + * value. On the first iteration the vertex program is invoked on + * all vertices and is passed the default message. On subsequent + * iterations the vertex program is only invoked on those vertices + * that receive messages. + * + * @param sendMsg a user supplied function that is applied to out + * edges of vertices that received messages in the current + * iteration + * + * @param mergeMsg a user supplied function that takes two incoming + * messages of type A and merges them into a single message of type + * A. ''This function must be commutative and associative and + * ideally the size of A should not increase.'' + * + * @return the resulting graph at the end of the computation + * + */ + def pregel[VD: ClassTag, ED: ClassTag, A: ClassTag] + (graph: Graph[VD, ED], + maxIterations: Int = Int.MaxValue, + activeDirection: EdgeDirection = EdgeDirection.Either) + (vprog: (PregelContext, VD, Option[A]) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = + { + // Initialize the graph with all vertices active + var g: Graph[(Boolean, VD), ED] = graph.mapVertices { (vid, vdata) => (true, vdata) }.cache() + // Determine the set of vertices that did not vote to halt + var activeVertices = g.vertices + var numActive = activeVertices.count() + var i = 0 + while (numActive > 0 && i < maxIterations) { + // Compute the messages for all the active vertices + val messages = g.mapVertices((vid, vdata) => vdata._2) // remove the active boolean + .mapReduceTriplets(sendMsg, mergeMsg, Some((activeVertices, activeDirection))) // Construct messages + + // get a reference to the current graph so that we can unpersist it once the new graph is created. + val prevG = g + + // Receive the messages to the subset of active vertices + g = g.outerJoinVertices(messages){ (vid, activeAndData, msg) => + val (active, vdata) = activeAndData + // If the vertex voted to halt and received no message then we can skip the vertex program + if (!active && msg.isEmpty) { + activeAndData + } else { + // The vertex program is either active or received a message (or both). + val ctx = new PregelContext(vid, 0) + // A vertex program should vote to halt again even if it has previously voted to halt + val newVertexVal = vprog(ctx, vdata, msg) + val isActive = !ctx.halt + (isActive, newVertexVal) + } + }.cache() + + // Recompute the active vertices (those that have not voted to halt) + activeVertices = g.vertices.filter(v => v._2._1) + + // Force all computation! + numActive = activeVertices.count() + + // Unpersist the RDDs hidden by newly-materialized RDDs + prevG.unpersistVertices(blocking=false) + prevG.edges.unpersist(blocking=false) + + logInfo("Pregel finished iteration " + i) + // count the iteration + i += 1 + } + g.mapVertices((id, vdata) => vdata._2) + } // end of apply + + } // end of class Pregel diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala b/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala new file mode 100644 index 0000000000000..9a6879f2ae089 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx + +class PregelContext(val id: VertexId, val iteration: Int, var halt: Boolean = false) { + def voteToHalt() { + halt = true + } +} From 86a0cf4eb4285eb80c0dca2f4b4a81ed25524f45 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 25 Jun 2014 15:27:58 -0700 Subject: [PATCH 02/12] adding iteration number to context --- graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 96404110cad9c..288a5cd921b83 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -243,7 +243,7 @@ object Pregel extends Logging { activeAndData } else { // The vertex program is either active or received a message (or both). - val ctx = new PregelContext(vid, 0) + val ctx = new PregelContext(vid, i) // A vertex program should vote to halt again even if it has previously voted to halt val newVertexVal = vprog(ctx, vdata, msg) val isActive = !ctx.halt From 9676cb336050f83c289220638bd5f5d18170cbb6 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 25 Jun 2014 17:33:26 -0700 Subject: [PATCH 03/12] Simplifying pregel api and applying to pagerank. --- .../org/apache/spark/graphx/Pregel.scala | 30 +++++++++---------- .../apache/spark/graphx/PregelContext.scala | 6 +--- .../apache/spark/graphx/lib/PageRank.scala | 30 +++++++++++-------- 3 files changed, 32 insertions(+), 34 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 288a5cd921b83..b9753d1d64e32 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -109,6 +109,7 @@ object Pregel extends Logging { * @return the resulting graph at the end of the computation * */ + @deprecated ("Switching to Pregel.run.", "1.1") def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, @@ -212,47 +213,44 @@ object Pregel extends Logging { * @return the resulting graph at the end of the computation * */ - def pregel[VD: ClassTag, ED: ClassTag, A: ClassTag] + def run[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) - (vprog: (PregelContext, VD, Option[A]) => VD, - sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], + (vertexProgram: (VertexId, (VD, Boolean), Option[A], PregelContext) => (VD, Boolean), + sendMsg: (EdgeTriplet[(VD, Boolean), ED], PregelContext) => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { // Initialize the graph with all vertices active - var g: Graph[(Boolean, VD), ED] = graph.mapVertices { (vid, vdata) => (true, vdata) }.cache() + var g: Graph[(VD, Boolean), ED] = graph.mapVertices { (vid, vdata) => (vdata, true) }.cache() // Determine the set of vertices that did not vote to halt var activeVertices = g.vertices var numActive = activeVertices.count() var i = 0 while (numActive > 0 && i < maxIterations) { + val ctx = new PregelContext(i) // Compute the messages for all the active vertices - val messages = g.mapVertices((vid, vdata) => vdata._2) // remove the active boolean - .mapReduceTriplets(sendMsg, mergeMsg, Some((activeVertices, activeDirection))) // Construct messages + val messages = g.mapReduceTriplets(t => sendMsg(t, ctx), mergeMsg, Some((activeVertices, activeDirection))) // get a reference to the current graph so that we can unpersist it once the new graph is created. val prevG = g // Receive the messages to the subset of active vertices - g = g.outerJoinVertices(messages){ (vid, activeAndData, msg) => - val (active, vdata) = activeAndData + g = g.outerJoinVertices(messages){ (vid, dataAndActive, msgOpt) => + val (vdata, active) = dataAndActive // If the vertex voted to halt and received no message then we can skip the vertex program - if (!active && msg.isEmpty) { - activeAndData + if (!active && msgOpt.isEmpty) { + dataAndActive } else { // The vertex program is either active or received a message (or both). - val ctx = new PregelContext(vid, i) // A vertex program should vote to halt again even if it has previously voted to halt - val newVertexVal = vprog(ctx, vdata, msg) - val isActive = !ctx.halt - (isActive, newVertexVal) + vertexProgram(vid, dataAndActive, msgOpt, ctx) } }.cache() // Recompute the active vertices (those that have not voted to halt) - activeVertices = g.vertices.filter(v => v._2._1) + activeVertices = g.vertices.filter(v => v._2._2) // Force all computation! numActive = activeVertices.count() @@ -265,7 +263,7 @@ object Pregel extends Logging { // count the iteration i += 1 } - g.mapVertices((id, vdata) => vdata._2) + g.mapVertices((id, vdata) => vdata._1) } // end of apply diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala b/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala index 9a6879f2ae089..c20288d80c30c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala @@ -17,8 +17,4 @@ package org.apache.spark.graphx -class PregelContext(val id: VertexId, val iteration: Int, var halt: Boolean = false) { - def voteToHalt() { - halt = true - } -} +class PregelContext(val iteration: Int) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 257e2f3a36115..f85371baeb610 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -144,24 +144,28 @@ object PageRank extends Logging { } // Set the weight on the edges based on the degree .mapTriplets( e => 1.0 / e.srcAttr ) - // Set the vertex attributes to (initalPR, delta = 0) - .mapVertices( (id, attr) => (0.0, 0.0) ) + // Set the vertex attributes to (currentPr, deltaToSend) + .mapVertices( (id, attr) => (resetProb, 1.0 - resetProb) ) .cache() // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = { - val (oldPR, lastDelta) = attr - val newPR = oldPR + (1.0 - resetProb) * msgSum - (newPR, newPR - oldPR) + def vertexProgram(id: VertexId, attr: ((Double, Double), Boolean), msgSum: Option[Double], ctx: PregelContext) = { + var ((oldPR, pendingDelta), wasActive) = attr + val newPR = oldPR + (1.0 - resetProb) * msgSum.getOrElse(0.0) + // if we were active then we sent the pending delta on the last iteration + if (wasActive) { + pendingDelta = 0.0 + } + pendingDelta += msgSum.getOrElse(0.0) + val isActive = math.abs(pendingDelta) >= tol + ((newPR, pendingDelta), isActive) } - def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { - if (edge.srcAttr._2 > tol) { - Iterator((edge.dstId, edge.srcAttr._2 * edge.attr)) - } else { - Iterator.empty - } + def sendMessage(edge: EdgeTriplet[((Double, Double), Boolean), Double], ctx: PregelContext) = { + val ((srcPr, srcDelta), srcIsActive) = edge.srcAttr + assert(srcIsActive) + Iterator((edge.dstId, srcDelta * edge.attr)) } def messageCombiner(a: Double, b: Double): Double = a + b @@ -170,7 +174,7 @@ object PageRank extends Logging { val initialMessage = resetProb / (1.0 - resetProb) // Execute a dynamic version of Pregel. - Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)( + Pregel.run(pagerankGraph, activeDirection = EdgeDirection.Out)( vertexProgram, sendMessage, messageCombiner) .mapVertices((vid, attr) => attr._1) } // end of deltaPageRank From ac4680eda7b6172783654d4bfeca4a3835a93a02 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 25 Jun 2014 18:02:43 -0700 Subject: [PATCH 04/12] further simplifying API --- .../org/apache/spark/graphx/Pregel.scala | 30 +++++++++++++++---- .../apache/spark/graphx/PregelContext.scala | 1 - .../apache/spark/graphx/lib/PageRank.scala | 19 +++++++----- 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index b9753d1d64e32..0c91d97089c74 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -55,6 +55,16 @@ import org.apache.spark.Logging */ object Pregel extends Logging { + class Context(val iteration: Int) + + class VertexContext(iteration: Int, val wasActive: Boolean, var isActive: Boolean = true) extends Context(iteration) { + def deactivate() { isActive = false } + } + + class EdgeContext(iteration: Int, val srcIsActive: Boolean, val dstIsActive: Boolean) extends Context(iteration) + + + /** * Execute a Pregel-like iterative vertex-parallel abstraction. The * user-defined vertex-program `vprog` is executed in parallel on @@ -217,8 +227,8 @@ object Pregel extends Logging { (graph: Graph[VD, ED], maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) - (vertexProgram: (VertexId, (VD, Boolean), Option[A], PregelContext) => (VD, Boolean), - sendMsg: (EdgeTriplet[(VD, Boolean), ED], PregelContext) => Iterator[(VertexId, A)], + (vertexProgram: (VertexId, VD, Option[A], VertexContext) => VD, + sendMsg: (EdgeTriplet[VD, ED], EdgeContext) => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { @@ -229,9 +239,18 @@ object Pregel extends Logging { var numActive = activeVertices.count() var i = 0 while (numActive > 0 && i < maxIterations) { - val ctx = new PregelContext(i) + // The send message wrapper removes the active fields from the triplet and places them in the edge context. + def sendMessageWrapper(triplet: EdgeTriplet[(VD, Boolean),ED]): Iterator[(VertexId, A)] = { + val simpleTriplet = new EdgeTriplet[VD, ED]() + simpleTriplet.set(triplet) + simpleTriplet.srcAttr = triplet.srcAttr._1 + simpleTriplet.dstAttr = triplet.dstAttr._1 + val ctx = new EdgeContext(i, triplet.srcAttr._2, triplet.dstAttr._2) + sendMsg(simpleTriplet, ctx) + } + // Compute the messages for all the active vertices - val messages = g.mapReduceTriplets(t => sendMsg(t, ctx), mergeMsg, Some((activeVertices, activeDirection))) + val messages = g.mapReduceTriplets(sendMessageWrapper, mergeMsg, Some((activeVertices, activeDirection))) // get a reference to the current graph so that we can unpersist it once the new graph is created. val prevG = g @@ -243,9 +262,10 @@ object Pregel extends Logging { if (!active && msgOpt.isEmpty) { dataAndActive } else { + val ctx = new VertexContext(i, active) // The vertex program is either active or received a message (or both). // A vertex program should vote to halt again even if it has previously voted to halt - vertexProgram(vid, dataAndActive, msgOpt, ctx) + (vertexProgram(vid, vdata, msgOpt, ctx), ctx.isActive) } }.cache() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala b/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala index c20288d80c30c..7502e85fc6103 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala @@ -17,4 +17,3 @@ package org.apache.spark.graphx -class PregelContext(val iteration: Int) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index f85371baeb610..ec3b3d14ea17a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.Logging import org.apache.spark.graphx._ +import org.apache.spark.graphx.Pregel._ /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. @@ -150,21 +151,23 @@ object PageRank extends Logging { // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(id: VertexId, attr: ((Double, Double), Boolean), msgSum: Option[Double], ctx: PregelContext) = { - var ((oldPR, pendingDelta), wasActive) = attr + def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Option[Double], ctx: VertexContext) = { + var (oldPR, pendingDelta) = attr val newPR = oldPR + (1.0 - resetProb) * msgSum.getOrElse(0.0) // if we were active then we sent the pending delta on the last iteration - if (wasActive) { + if (ctx.wasActive) { pendingDelta = 0.0 } pendingDelta += msgSum.getOrElse(0.0) - val isActive = math.abs(pendingDelta) >= tol - ((newPR, pendingDelta), isActive) + if (math.abs(pendingDelta) <= tol) { + ctx.deactivate() + } + (newPR, pendingDelta) } - def sendMessage(edge: EdgeTriplet[((Double, Double), Boolean), Double], ctx: PregelContext) = { - val ((srcPr, srcDelta), srcIsActive) = edge.srcAttr - assert(srcIsActive) + def sendMessage(edge: EdgeTriplet[(Double, Double), Double], ctx: EdgeContext) = { + val (srcPr, srcDelta) = edge.srcAttr + assert(ctx.srcIsActive) Iterator((edge.dstId, srcDelta * edge.attr)) } From 707b699ea7966f49f31234bba95e26178d07fc11 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 25 Jun 2014 23:57:47 -0700 Subject: [PATCH 05/12] Fixing unit tests. --- .../org/apache/spark/graphx/Pregel.scala | 7 +- .../apache/spark/graphx/lib/PageRank.scala | 15 +- .../spark/graphx/util/GraphGenerators.scala | 15 ++ .../spark/graphx/lib/PageRankSuite.scala | 131 ++++++++++++------ 4 files changed, 116 insertions(+), 52 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 0c91d97089c74..63aac34f093cf 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -276,8 +276,11 @@ object Pregel extends Logging { numActive = activeVertices.count() // Unpersist the RDDs hidden by newly-materialized RDDs - prevG.unpersistVertices(blocking=false) - prevG.edges.unpersist(blocking=false) +// prevG.unpersistVertices(blocking=false) +// prevG.edges.unpersist(blocking=false) + + println("Finished Iteration " + i) + g.vertices.foreach(println(_)) logInfo("Pregel finished iteration " + i) // count the iteration diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index ec3b3d14ea17a..b8fdfb3a1bc20 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -90,6 +90,7 @@ object PageRank extends Logging { // Set the vertex attributes to the initial pagerank values .mapVertices( (id, attr) => resetProb ) + var iteration = 0 var prevRankGraph: Graph[Double, Double] = null while (iteration < numIter) { @@ -146,19 +147,19 @@ object PageRank extends Logging { // Set the weight on the edges based on the degree .mapTriplets( e => 1.0 / e.srcAttr ) // Set the vertex attributes to (currentPr, deltaToSend) - .mapVertices( (id, attr) => (resetProb, 1.0 - resetProb) ) + .mapVertices( (id, attr) => (resetProb, (1.0 - resetProb) * resetProb) ) .cache() // Define the three functions needed to implement PageRank in the GraphX // version of Pregel def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Option[Double], ctx: VertexContext) = { var (oldPR, pendingDelta) = attr - val newPR = oldPR + (1.0 - resetProb) * msgSum.getOrElse(0.0) + val newPR = oldPR + msgSum.getOrElse(0.0) // if we were active then we sent the pending delta on the last iteration if (ctx.wasActive) { pendingDelta = 0.0 } - pendingDelta += msgSum.getOrElse(0.0) + pendingDelta += (1.0 - resetProb) * msgSum.getOrElse(0.0) if (math.abs(pendingDelta) <= tol) { ctx.deactivate() } @@ -173,12 +174,12 @@ object PageRank extends Logging { def messageCombiner(a: Double, b: Double): Double = a + b - // The initial message received by all vertices in PageRank - val initialMessage = resetProb / (1.0 - resetProb) - // Execute a dynamic version of Pregel. - Pregel.run(pagerankGraph, activeDirection = EdgeDirection.Out)( + val prGraph = Pregel.run(pagerankGraph, activeDirection = EdgeDirection.Out)( vertexProgram, sendMessage, messageCombiner) .mapVertices((vid, attr) => attr._1) + .cache() + val normalizer: Double = prGraph.vertices.map(x => x._2).reduce(_ + _) + prGraph.mapVertices((id, pr) => pr / normalizer) } // end of deltaPageRank } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index 8a13c74221546..1e4f15e171dc4 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -265,4 +265,19 @@ object GraphGenerators { Graph.fromEdgeTuples(edges, 1) } // end of starGraph + + /** + * Create a cycle graph. + * + * @param sc the spark context in which to construct the graph + * @param nverts the number of vertices in the cycle + * + * @return A cycle graph containing `nverts` vertices with vertex 0 + * being the center vertex. + */ + def cycleGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { + val edges: RDD[(VertexId, VertexId)] = sc.parallelize(0 until nverts).map(vid => (vid, (vid + 1) % nverts)) + Graph.fromEdgeTuples(edges, 1) + } // end of starGraph + } // end of Graph Generators diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index fc491ae327c2a..3dc3af0ff8897 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.graphx.lib -import org.scalatest.FunSuite + +import org.scalatest.{Matchers, FunSuite} import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ @@ -54,83 +55,127 @@ object GridPageRank { inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum } } - (0L until (nRows * nCols)).zip(pr) + val normalizer = pr.sum + (0L until (nRows * nCols)).zip(pr.map(p => p / normalizer)) } } -class PageRankSuite extends FunSuite with LocalSparkContext { +class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } .map { case (id, error) => error }.sum } - test("Star PageRank") { + + test("Static Star PageRank") { withSpark { sc => - val nVertices = 100 + val nVertices = 10 val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() val resetProb = 0.15 - val errorTol = 1.0e-5 - - val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices - val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache() + val staticRanks: VertexRDD[Double] = starGraph.staticPageRank(numIter = 2, resetProb).vertices + // Check the static pagerank + val pageranks: Map[VertexId, Double] = staticRanks.collect().toMap + val perimeter = (nVertices - 1.0) * resetProb + val center = resetProb + (1.0 - resetProb) * perimeter + val normalizer = perimeter + center + pageranks(0) should equal((center / normalizer) +- 1.0e-7) + for (i <- 1 until nVertices) { + pageranks(i) should equal((resetProb / normalizer) +- 1.0e-7) + } + } + } - // Static PageRank should only take 2 iterations to converge - val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => - if (pr1 != pr2) 1 else 0 - }.map { case (vid, test) => test }.sum - assert(notMatching === 0) - val staticErrors = staticRanks2.map { case (vid, pr) => - val correct = (vid > 0 && pr == resetProb) || - (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) - if (!correct) 1 else 0 + test("Dynamic Star PageRank") { + withSpark { sc => + val nVertices = 10 + val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val resetProb = 0.15 + val dynamicRanks: VertexRDD[Double] = starGraph.pageRank(1.0e-10, resetProb).vertices + // Check the pagerank values + val pageranks: Map[VertexId, Double] = dynamicRanks.collect().toMap + val perimeter = (nVertices - 1.0) * resetProb + val center = resetProb + (1.0 - resetProb) * perimeter + val normalizer = perimeter + center + pageranks(0) should equal ((center / normalizer) +- 1.0e-7) + for(i <- 1 until nVertices) { + pageranks(i) should equal ((resetProb / normalizer) +- 1.0e-7) } - assert(staticErrors.sum === 0) - - val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache() - assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) } } // end of test Star PageRank + test("Static Cycle PageRank") { + withSpark { sc => + val nVertices = 10 + val starGraph = GraphGenerators.cycleGraph(sc, nVertices) + val resetProb = 0.15 + val staticRanks: VertexRDD[Double] = starGraph.staticPageRank(numIter = 10, resetProb).vertices + // Check the static pagerank + val pageranks: Map[VertexId, Double] = staticRanks.collect().toMap + for (i <- 0 until nVertices) { + pageranks(i) should equal ( (1.0/nVertices) +- 1.0e-7) + } + } + } + - test("Grid PageRank") { + test("Dynamic Cycle PageRank") { withSpark { sc => - val rows = 10 - val cols = 10 + val nVertices = 3 + val starGraph = GraphGenerators.cycleGraph(sc, nVertices) + val resetProb = 0.15 + val staticRanks: VertexRDD[Double] = starGraph.pageRank(1.0e-3, resetProb).vertices + // Check the static pagerank + val pageranks: Map[VertexId, Double] = staticRanks.collect().toMap + println(pageranks) + for (i <- 0 until nVertices) { + pageranks(i) should equal ( (1.0/nVertices) +- 1.0e-7) + } + } + } + + + test("Grid Static PageRank") { + withSpark { sc => + val rows = 5 + val cols = 5 val resetProb = 0.15 val tol = 0.0001 - val numIter = 50 + val numIter = 20 val errorTol = 1.0e-5 + val truePr = GridPageRank(rows, cols, numIter, resetProb).toMap val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() - - val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache() - val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache() - val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache() - - assert(compareRanks(staticRanks, referenceRanks) < errorTol) - assert(compareRanks(dynamicRanks, referenceRanks) < errorTol) + val vertices = gridGraph.staticPageRank(numIter, resetProb).vertices.cache() + val pageranks: Map[VertexId, Double] = vertices.collect().toMap + for ((k,pr) <- truePr) { + pageranks(k) should equal ( pr +- 1.0e-5) + } } } // end of Grid PageRank - test("Chain PageRank") { + test("Grid Dynamic PageRank") { withSpark { sc => - val chain1 = (0 until 9).map(x => (x, x+1) ) - val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } - val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val rows = 5 + val cols = 5 val resetProb = 0.15 val tol = 0.0001 - val numIter = 10 + val numIter = 20 val errorTol = 1.0e-5 + val truePr = GridPageRank(rows, cols, numIter, resetProb).toMap + val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() + val vertices = gridGraph.pageRank(1.0e-10, resetProb).vertices.cache() + val pageranks: Map[VertexId, Double] = vertices.collect().toMap + // vertices.collect.foreach(println(_)) + for ((k,pr) <- truePr) { + pageranks(k) should equal ( pr +- 1.0e-5) + } + } + } // end of Grid PageRank - val staticRanks = chain.staticPageRank(numIter, resetProb).vertices - val dynamicRanks = chain.pageRank(tol, resetProb).vertices - assert(compareRanks(staticRanks, dynamicRanks) < errorTol) - } - } } From 6b5100a2bcb5625fe81d13ac24103e7ce0aa82b3 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 26 Jun 2014 00:03:53 -0700 Subject: [PATCH 06/12] removing extraneous files. --- .../apache/spark/graphx/PregelContext.scala | 19 ------------------- 1 file changed, 19 deletions(-) delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala b/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala deleted file mode 100644 index 7502e85fc6103..0000000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/PregelContext.scala +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.graphx - From 57b7de920a6e0f93213a846f2b2c9cc2b8a68911 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 26 Jun 2014 00:31:50 -0700 Subject: [PATCH 07/12] minor cleanups --- .../scala/org/apache/spark/graphx/Pregel.scala | 14 +++++++------- .../apache/spark/graphx/lib/PageRankSuite.scala | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 63aac34f093cf..9c730704ded05 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -249,12 +249,12 @@ object Pregel extends Logging { sendMsg(simpleTriplet, ctx) } - // Compute the messages for all the active vertices - val messages = g.mapReduceTriplets(sendMessageWrapper, mergeMsg, Some((activeVertices, activeDirection))) - // get a reference to the current graph so that we can unpersist it once the new graph is created. val prevG = g + // Compute the messages for all the active vertices + val messages = g.mapReduceTriplets(sendMessageWrapper, mergeMsg, Some((activeVertices, activeDirection))) + // Receive the messages to the subset of active vertices g = g.outerJoinVertices(messages){ (vid, dataAndActive, msgOpt) => val (vdata, active) = dataAndActive @@ -276,11 +276,11 @@ object Pregel extends Logging { numActive = activeVertices.count() // Unpersist the RDDs hidden by newly-materialized RDDs -// prevG.unpersistVertices(blocking=false) -// prevG.edges.unpersist(blocking=false) + //prevG.unpersistVertices(blocking=false) + //prevG.edges.unpersist(blocking=false) - println("Finished Iteration " + i) - g.vertices.foreach(println(_)) + //println("Finished Iteration " + i) + // g.vertices.foreach(println(_)) logInfo("Pregel finished iteration " + i) // count the iteration diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 3dc3af0ff8897..1a828468ffa18 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -68,7 +68,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } .map { case (id, error) => error }.sum } - +/* test("Static Star PageRank") { withSpark { sc => @@ -137,7 +137,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { } } } - +*/ test("Grid Static PageRank") { withSpark { sc => @@ -157,7 +157,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { } } // end of Grid PageRank - +/* test("Grid Dynamic PageRank") { withSpark { sc => val rows = 5 @@ -176,6 +176,6 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { } } } // end of Grid PageRank - +*/ } From 5bb139615f3645774353ced1c5ce47849dd12193 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 26 Jun 2014 01:09:48 -0700 Subject: [PATCH 08/12] adding breaking test to tests --- .../spark/graphx/lib/PageRankSuite.scala | 85 ++++++++++++++++--- 1 file changed, 71 insertions(+), 14 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 1a828468ffa18..14af24c7e04b9 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx.lib import org.scalatest.{Matchers, FunSuite} - +import collection.mutable.MutableList import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.graphx._ @@ -27,9 +27,12 @@ import org.apache.spark.graphx.lib._ import org.apache.spark.graphx.util.GraphGenerators import org.apache.spark.rdd._ -object GridPageRank { - def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { - val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) +import scala.collection.mutable + +object TruePageRank { + + def grid(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { + val inNbrs = Array.fill(nRows * nCols)(mutable.MutableList.empty[Int]) val outDegree = Array.fill(nRows * nCols)(0) // Convert row column address into vertex ids (row major order) def sub2ind(r: Int, c: Int): Int = r * nCols + c @@ -45,18 +48,36 @@ object GridPageRank { inNbrs(sub2ind(r,c+1)) += ind } } + val vids = 0 until (nRows * nCols) + pr(vids.zip(inNbrs).toMap, vids.zip(outDegree).toMap, nIter, resetProb) + } + + def edges(edges: Array[(Int, Int)], nIter: Int, resetProb: Double) = { + val vids = edges.flatMap { case (s,d) => Iterator(s,d) }.toSet + val inNbrs = vids.map(id => (id, mutable.MutableList.empty[Int])).toMap + val outDegree = mutable.Map.empty[Int, Int] + for ((s,d) <- edges) { + outDegree(s) = outDegree.getOrElse(s,0) + 1 + inNbrs(d) += s + } + pr(inNbrs, outDegree, nIter, resetProb) + } + + def pr(inNbrs: collection.Map[Int, mutable.MutableList[Int]], + outDegree: collection.Map[Int, Int], nIter: Int, resetProb: Double) = { + val nVerts = inNbrs.size // compute the pagerank - var pr = Array.fill(nRows * nCols)(resetProb) + var pr = Array.fill(nVerts)(resetProb) for (iter <- 0 until nIter) { val oldPr = pr - pr = new Array[Double](nRows * nCols) - for (ind <- 0 until (nRows * nCols)) { + pr = new Array[Double](nVerts) + for (ind <- 0 until nVerts) { pr(ind) = resetProb + (1.0 - resetProb) * inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum } } val normalizer = pr.sum - (0L until (nRows * nCols)).zip(pr.map(p => p / normalizer)) + (0L until nVerts).zip(pr.map(p => p / normalizer)) } } @@ -68,7 +89,43 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } .map { case (id, error) => error }.sum } -/* + + + test("Simple Chain Static PageRank") { + withSpark { sc => + val chainEdges = Array((0, 1), (1, 2), (2, 3), (4, 3)) + val rawEdges = sc.parallelize(chainEdges, 1).map { case (s, d) => (s.toLong, d.toLong)} + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 10 + val truePr = TruePageRank.edges(chainEdges, numIter, resetProb).toMap + val vertices = chain.staticPageRank(numIter, resetProb).vertices.collect + val pageranks = vertices.toMap + for (i <- 0 until 4) { + pageranks(i) should equal(truePr(i) +- 1.0e-7) + } + } + } + + + test("Simple Chain Dynamic PageRank") { + withSpark { sc => + val chainEdges = Array((0, 1), (1, 2), (2, 3), (4, 3)) + val rawEdges = sc.parallelize(chainEdges, 1).map { case (s, d) => (s.toLong, d.toLong)} + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 10 + val truePr = TruePageRank.edges(chainEdges, numIter, resetProb).toMap + val vertices = chain.pageRank(1.0e-10, resetProb).vertices.collect + val pageranks = vertices.toMap + for (i <- 0 until 4) { + pageranks(i) should equal(truePr(i) +- 1.0e-7) + } + } + } + test("Static Star PageRank") { withSpark { sc => @@ -137,7 +194,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { } } } -*/ + test("Grid Static PageRank") { withSpark { sc => @@ -147,7 +204,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { val tol = 0.0001 val numIter = 20 val errorTol = 1.0e-5 - val truePr = GridPageRank(rows, cols, numIter, resetProb).toMap + val truePr = TruePageRank.grid(rows, cols, numIter, resetProb).toMap val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() val vertices = gridGraph.staticPageRank(numIter, resetProb).vertices.cache() val pageranks: Map[VertexId, Double] = vertices.collect().toMap @@ -157,7 +214,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { } } // end of Grid PageRank -/* + test("Grid Dynamic PageRank") { withSpark { sc => val rows = 5 @@ -166,7 +223,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { val tol = 0.0001 val numIter = 20 val errorTol = 1.0e-5 - val truePr = GridPageRank(rows, cols, numIter, resetProb).toMap + val truePr = TruePageRank.grid(rows, cols, numIter, resetProb).toMap val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() val vertices = gridGraph.pageRank(1.0e-10, resetProb).vertices.cache() val pageranks: Map[VertexId, Double] = vertices.collect().toMap @@ -176,6 +233,6 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { } } } // end of Grid PageRank -*/ + } From 8f413711d58fce9f84b0e83da6bcebab0da2c2bc Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 26 Jun 2014 21:05:45 -0700 Subject: [PATCH 09/12] removing context and passing activeness directly through the vertex property. This addresses the issue where byte-code inspection would always force a 3-way join. --- .../org/apache/spark/graphx/Pregel.scala | 53 +++++++------------ .../apache/spark/graphx/lib/PageRank.scala | 18 +++---- .../spark/graphx/lib/PageRankSuite.scala | 3 +- 3 files changed, 28 insertions(+), 46 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 9c730704ded05..e57a6ae8e846c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -55,15 +55,6 @@ import org.apache.spark.Logging */ object Pregel extends Logging { - class Context(val iteration: Int) - - class VertexContext(iteration: Int, val wasActive: Boolean, var isActive: Boolean = true) extends Context(iteration) { - def deactivate() { isActive = false } - } - - class EdgeContext(iteration: Int, val srcIsActive: Boolean, val dstIsActive: Boolean) extends Context(iteration) - - /** * Execute a Pregel-like iterative vertex-parallel abstraction. The @@ -169,6 +160,7 @@ object Pregel extends Logging { g } // end of apply + /** * Execute a Pregel-like iterative vertex-parallel abstraction. The * user-defined vertex-program `vprog` is executed in parallel on @@ -227,50 +219,41 @@ object Pregel extends Logging { (graph: Graph[VD, ED], maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) - (vertexProgram: (VertexId, VD, Option[A], VertexContext) => VD, - sendMsg: (EdgeTriplet[VD, ED], EdgeContext) => Iterator[(VertexId, A)], + (vertexProgram: (Int, VertexId, VD, Boolean, Option[A]) => (VD, Boolean), + sendMsg: (Int, EdgeTriplet[(VD, Boolean), ED]) => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { // Initialize the graph with all vertices active - var g: Graph[(VD, Boolean), ED] = graph.mapVertices { (vid, vdata) => (vdata, true) }.cache() + var currengGraph: Graph[(VD, Boolean), ED] = + graph.mapVertices { (vid, vdata) => (vdata, true) }.cache() // Determine the set of vertices that did not vote to halt - var activeVertices = g.vertices + var activeVertices = currengGraph.vertices var numActive = activeVertices.count() - var i = 0 - while (numActive > 0 && i < maxIterations) { - // The send message wrapper removes the active fields from the triplet and places them in the edge context. - def sendMessageWrapper(triplet: EdgeTriplet[(VD, Boolean),ED]): Iterator[(VertexId, A)] = { - val simpleTriplet = new EdgeTriplet[VD, ED]() - simpleTriplet.set(triplet) - simpleTriplet.srcAttr = triplet.srcAttr._1 - simpleTriplet.dstAttr = triplet.dstAttr._1 - val ctx = new EdgeContext(i, triplet.srcAttr._2, triplet.dstAttr._2) - sendMsg(simpleTriplet, ctx) - } - - // get a reference to the current graph so that we can unpersist it once the new graph is created. - val prevG = g + var iteration = 0 + while (numActive > 0 && iteration < maxIterations) { + // get a reference to the current graph to enable unprecistance. + val prevG = currengGraph // Compute the messages for all the active vertices - val messages = g.mapReduceTriplets(sendMessageWrapper, mergeMsg, Some((activeVertices, activeDirection))) + val messages = currengGraph.mapReduceTriplets( t => sendMsg(iteration, t), mergeMsg, + Some((activeVertices, activeDirection))) // Receive the messages to the subset of active vertices - g = g.outerJoinVertices(messages){ (vid, dataAndActive, msgOpt) => + currengGraph = currengGraph.outerJoinVertices(messages){ (vid, dataAndActive, msgOpt) => val (vdata, active) = dataAndActive // If the vertex voted to halt and received no message then we can skip the vertex program if (!active && msgOpt.isEmpty) { dataAndActive } else { - val ctx = new VertexContext(i, active) // The vertex program is either active or received a message (or both). // A vertex program should vote to halt again even if it has previously voted to halt - (vertexProgram(vid, vdata, msgOpt, ctx), ctx.isActive) + vertexProgram(iteration, vid, vdata, active, msgOpt) } }.cache() // Recompute the active vertices (those that have not voted to halt) - activeVertices = g.vertices.filter(v => v._2._2) + activeVertices = currengGraph.vertices.filter(v => v._2._2) // Force all computation! numActive = activeVertices.count() @@ -282,11 +265,11 @@ object Pregel extends Logging { //println("Finished Iteration " + i) // g.vertices.foreach(println(_)) - logInfo("Pregel finished iteration " + i) + logInfo("Pregel finished iteration " + iteration) // count the iteration - i += 1 + iteration += 1 } - g.mapVertices((id, vdata) => vdata._1) + currengGraph.mapVertices((id, vdata) => vdata._1) } // end of apply diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index b8fdfb3a1bc20..e0fdaeb9221ce 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -90,7 +90,6 @@ object PageRank extends Logging { // Set the vertex attributes to the initial pagerank values .mapVertices( (id, attr) => resetProb ) - var iteration = 0 var prevRankGraph: Graph[Double, Double] = null while (iteration < numIter) { @@ -152,23 +151,22 @@ object PageRank extends Logging { // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Option[Double], ctx: VertexContext) = { + def vertexProgram(iter: Int, id: VertexId, attr: (Double, Double), wasActive: Boolean, + msgSum: Option[Double]) = { var (oldPR, pendingDelta) = attr val newPR = oldPR + msgSum.getOrElse(0.0) // if we were active then we sent the pending delta on the last iteration - if (ctx.wasActive) { + if (wasActive) { pendingDelta = 0.0 } pendingDelta += (1.0 - resetProb) * msgSum.getOrElse(0.0) - if (math.abs(pendingDelta) <= tol) { - ctx.deactivate() - } - (newPR, pendingDelta) + val isActive = math.abs(pendingDelta) >= tol + ((newPR, pendingDelta), isActive) } - def sendMessage(edge: EdgeTriplet[(Double, Double), Double], ctx: EdgeContext) = { - val (srcPr, srcDelta) = edge.srcAttr - assert(ctx.srcIsActive) + def sendMessage(iter: Int, edge: EdgeTriplet[((Double, Double), Boolean), Double]) = { + val ((srcPr, srcDelta), srcIsActive) = edge.srcAttr + assert(srcIsActive) Iterator((edge.dstId, srcDelta * edge.attr)) } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 14af24c7e04b9..aadc53fe54635 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -170,7 +170,8 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { val nVertices = 10 val starGraph = GraphGenerators.cycleGraph(sc, nVertices) val resetProb = 0.15 - val staticRanks: VertexRDD[Double] = starGraph.staticPageRank(numIter = 10, resetProb).vertices + val staticRanks: VertexRDD[Double] = + starGraph.staticPageRank(numIter = 10, resetProb).vertices // Check the static pagerank val pageranks: Map[VertexId, Double] = staticRanks.collect().toMap for (i <- 0 until nVertices) { From e84d33e931c958da677c3e662f365a85cd93d2ca Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 26 Jun 2014 21:18:35 -0700 Subject: [PATCH 10/12] Adding PregelVertex type which encapsulates the vertex and its activeness. --- .../org/apache/spark/graphx/Pregel.scala | 35 +++++++++++++------ .../apache/spark/graphx/lib/PageRank.scala | 12 +++---- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index e57a6ae8e846c..f6451b42510c0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -21,6 +21,21 @@ import scala.reflect.ClassTag import org.apache.spark.Logging +/** + * The Pregel Vertex class contains the vertex attribute and the boolean flag indicating whether + * the vertex is active. + * + * @param attr + * @param isActive + * @tparam T + */ +sealed case class PregelVertex[T] (attr: T, isActive: Boolean = true) + extends Product2[T, Boolean] { + override def _1: T = attr + override def _2: Boolean = isActive +} + + /** * Implements a Pregel-like bulk-synchronous message-passing API. * @@ -55,7 +70,6 @@ import org.apache.spark.Logging */ object Pregel extends Logging { - /** * Execute a Pregel-like iterative vertex-parallel abstraction. The * user-defined vertex-program `vprog` is executed in parallel on @@ -219,14 +233,14 @@ object Pregel extends Logging { (graph: Graph[VD, ED], maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) - (vertexProgram: (Int, VertexId, VD, Boolean, Option[A]) => (VD, Boolean), - sendMsg: (Int, EdgeTriplet[(VD, Boolean), ED]) => Iterator[(VertexId, A)], + (vertexProgram: (Int, VertexId, PregelVertex[VD], Option[A]) => PregelVertex[VD], + sendMsg: (Int, EdgeTriplet[PregelVertex[VD], ED]) => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { // Initialize the graph with all vertices active - var currengGraph: Graph[(VD, Boolean), ED] = - graph.mapVertices { (vid, vdata) => (vdata, true) }.cache() + var currengGraph: Graph[PregelVertex[VD], ED] = + graph.mapVertices { (vid, vdata) => PregelVertex(vdata) }.cache() // Determine the set of vertices that did not vote to halt var activeVertices = currengGraph.vertices var numActive = activeVertices.count() @@ -240,15 +254,14 @@ object Pregel extends Logging { Some((activeVertices, activeDirection))) // Receive the messages to the subset of active vertices - currengGraph = currengGraph.outerJoinVertices(messages){ (vid, dataAndActive, msgOpt) => - val (vdata, active) = dataAndActive + currengGraph = currengGraph.outerJoinVertices(messages){ (vid, pVertex, msgOpt) => // If the vertex voted to halt and received no message then we can skip the vertex program - if (!active && msgOpt.isEmpty) { - dataAndActive + if (!pVertex.isActive && msgOpt.isEmpty) { + pVertex } else { // The vertex program is either active or received a message (or both). // A vertex program should vote to halt again even if it has previously voted to halt - vertexProgram(iteration, vid, vdata, active, msgOpt) + vertexProgram(iteration, vid, pVertex, msgOpt) } }.cache() @@ -269,7 +282,7 @@ object Pregel extends Logging { // count the iteration iteration += 1 } - currengGraph.mapVertices((id, vdata) => vdata._1) + currengGraph.mapVertices((id, vdata) => vdata.attr) } // end of apply diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index e0fdaeb9221ce..0fa5ddfe6aadd 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -151,21 +151,21 @@ object PageRank extends Logging { // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(iter: Int, id: VertexId, attr: (Double, Double), wasActive: Boolean, + def vertexProgram(iter: Int, id: VertexId, vertex: PregelVertex[(Double, Double)], msgSum: Option[Double]) = { - var (oldPR, pendingDelta) = attr + var (oldPR, pendingDelta) = vertex.attr val newPR = oldPR + msgSum.getOrElse(0.0) // if we were active then we sent the pending delta on the last iteration - if (wasActive) { + if (vertex.isActive) { pendingDelta = 0.0 } pendingDelta += (1.0 - resetProb) * msgSum.getOrElse(0.0) val isActive = math.abs(pendingDelta) >= tol - ((newPR, pendingDelta), isActive) + PregelVertex((newPR, pendingDelta), isActive) } - def sendMessage(iter: Int, edge: EdgeTriplet[((Double, Double), Boolean), Double]) = { - val ((srcPr, srcDelta), srcIsActive) = edge.srcAttr + def sendMessage(iter: Int, edge: EdgeTriplet[PregelVertex[(Double, Double)], Double]) = { + val PregelVertex((srcPr, srcDelta), srcIsActive) = edge.srcAttr assert(srcIsActive) Iterator((edge.dstId, srcDelta * edge.attr)) } From 7991f4b146804555190285961f732af958d9ff25 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Mon, 7 Jul 2014 11:36:16 -0700 Subject: [PATCH 11/12] Improving documentation --- .../org/apache/spark/graphx/Pregel.scala | 253 +++++++++--------- 1 file changed, 123 insertions(+), 130 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index f6451b42510c0..f6b658bd30bc9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -22,27 +22,58 @@ import org.apache.spark.Logging /** - * The Pregel Vertex class contains the vertex attribute and the boolean flag indicating whether - * the vertex is active. + * The Pregel Vertex class contains the vertex attribute and the boolean flag + * indicating whether the vertex is active. * - * @param attr - * @param isActive - * @tparam T + * @param attr the vertex prorperty during the pregel computation (e.g., + * PageRank) + * @param isActive a flag indicating whether the vertex is active + * @tparam T the type of the vertex property */ -sealed case class PregelVertex[T] (attr: T, isActive: Boolean = true) - extends Product2[T, Boolean] { +sealed case class PregelVertex[@specialized T] + (attr: T, isActive: Boolean = true) extends Product2[T, Boolean] { override def _1: T = attr override def _2: Boolean = isActive } /** - * Implements a Pregel-like bulk-synchronous message-passing API. + * The Pregel API enables users to express iterative graph algorithms in GraphX + * and is loosely based on the Google and GraphLab APIs. * - * Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over - * edges, enables the message sending computation to read both vertex attributes, and constrains - * messages to the graph structure. These changes allow for substantially more efficient - * distributed execution while also exposing greater flexibility for graph-based computation. + * At a high-level iterative graph algorithms like PageRank recursively define + * vertex properties in terms of the properties of neighboring vertices. These + * recursive properties are then computed through iterative fixed-point + * computations. For example, the PageRank of a web-page can be defined as a + * weighted some of the PageRank of web-pages that link to that page and is + * computed by iteratively updating the PageRank of each page until a + * fixed-point is reached (the PageRank values stop changing). + * + * The GraphX Pregel API expresses iterative graph algorithms as vertex-programs + * which can send and receive messages from neighboring vertices. Vertex + * programs have the following logic: + * + * {{{ + * while ( there are active vertices ) { + * for ( v in allVertices ) { + * // Send messages to neighbors + * if (isActive) { + * for (nbr in neighbors(v)) { + * val msgs: List[(id, msg)] = computeMsgs(Triplet(v, nbr)) + * for ((id, msg) <- msgs) { + * messageSum(id) = reduce(messageSum(id), msg) + * } + * } + * } + * // Receive the "sum" of the messages to v and update the property + * (vertexProperty(v), isActive) = + * vertexProgram(vertexProperty(v), messagesSum(v)) + * } + * } + * }}} + * + * The user defined `vertexProgram`, `computeMessage`, and `reduce` functions + * capture the core logic of graph algorithms. * * @example We can use the Pregel abstraction to implement PageRank: * {{{ @@ -56,19 +87,90 @@ sealed case class PregelVertex[T] (attr: T, isActive: Boolean = true) * // Set the vertex attributes to the initial pagerank values * .mapVertices((id, attr) => 1.0) * - * def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double = - * resetProb + (1.0 - resetProb) * msgSum - * def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = - * Iterator((edge.dstId, edge.srcAttr * edge.attr)) + * // Define the vertex program and message calculation functions. + * def vertexProgram(iter: Int, id: VertexId, oldV: PregelVertex[Double], + * msgSum: Option[Double]) = { + * PregelVertex(resetProb + (1.0 - resetProb) * msgSum.getOrElse(0.0)) + * } + * + * def computeMsgs(iter: Int, edge: EdgeTriplet[PregelVertex[Double], Double]) = { + * Iterator((edge.dstId, edge.srcAttr.attr * edge.attr)) + * } + * * def messageCombiner(a: Double, b: Double): Double = a + b - * val initialMessage = 0.0 - * // Execute Pregel for a fixed number of iterations. - * Pregel(pagerankGraph, initialMessage, numIter)( - * vertexProgram, sendMessage, messageCombiner) + * + * // Run PageRank + * val prGraph = Pregel.run(pagerankGraph, numIter, activeDirection = EdgeDirection.Out)( + * vertexProgram, sendMessage, messageCombiner).cache() + * + * // Normalize the pagerank vector: + * val normalizer: Double = prGraph.vertices.map(x => x._2).reduce(_ + _) + * + * prGraph.mapVertices((id, pr) => pr / normalizer) + * * }}} * */ object Pregel extends Logging { + /** + * The new Pregel API. + */ + def run[VD: ClassTag, ED: ClassTag, A: ClassTag] + (graph: Graph[VD, ED], + maxIterations: Int = Int.MaxValue, + activeDirection: EdgeDirection = EdgeDirection.Either) + (vertexProgram: (Int, VertexId, PregelVertex[VD], Option[A]) => PregelVertex[VD], + computeMsgs: (Int, EdgeTriplet[PregelVertex[VD], ED]) => Iterator[(VertexId, A)], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = + { + // Initialize the graph with all vertices active + var currengGraph: Graph[PregelVertex[VD], ED] = + graph.mapVertices { (vid, vdata) => PregelVertex(vdata) }.cache() + // Determine the set of vertices that did not vote to halt + var activeVertices = currengGraph.vertices + var numActive = activeVertices.count() + var iteration = 0 + while (numActive > 0 && iteration < maxIterations) { + // get a reference to the current graph to enable unprecistance. + val prevG = currengGraph + + // Compute the messages for all the active vertices + val messages = currengGraph.mapReduceTriplets( t => computeMsgs(iteration, t), mergeMsg, + Some((activeVertices, activeDirection))) + + // Receive the messages to the subset of active vertices + currengGraph = currengGraph.outerJoinVertices(messages){ (vid, pVertex, msgOpt) => + // If the vertex voted to halt and received no message then we can skip the vertex program + if (!pVertex.isActive && msgOpt.isEmpty) { + pVertex + } else { + // The vertex program is either active or received a message (or both). + // A vertex program should vote to halt again even if it has previously voted to halt + vertexProgram(iteration, vid, pVertex, msgOpt) + } + }.cache() + + // Recompute the active vertices (those that have not voted to halt) + activeVertices = currengGraph.vertices.filter(v => v._2._2) + + // Force all computation! + numActive = activeVertices.count() + + // Unpersist the RDDs hidden by newly-materialized RDDs + //prevG.unpersistVertices(blocking=false) + //prevG.edges.unpersist(blocking=false) + + //println("Finished Iteration " + i) + // g.vertices.foreach(println(_)) + + logInfo("Pregel finished iteration " + iteration) + // count the iteration + iteration += 1 + } + currengGraph.mapVertices((id, vdata) => vdata.attr) + } // end of apply + /** * Execute a Pregel-like iterative vertex-parallel abstraction. The @@ -124,7 +226,7 @@ object Pregel extends Logging { * @return the resulting graph at the end of the computation * */ - @deprecated ("Switching to Pregel.run.", "1.1") + // @deprecated ("Switching to Pregel.run.", "1.1") def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, @@ -175,115 +277,6 @@ object Pregel extends Logging { } // end of apply - /** - * Execute a Pregel-like iterative vertex-parallel abstraction. The - * user-defined vertex-program `vprog` is executed in parallel on - * each vertex receiving any inbound messages and computing a new - * value for the vertex. The `sendMsg` function is then invoked on - * all out-edges and is used to compute an optional message to the - * destination vertex. The `mergeMsg` function is a commutative - * associative function used to combine messages destined to the - * same vertex. - * - * On the first iteration all vertices receive the `initialMsg` and - * on subsequent iterations if a vertex does not receive a message - * then the vertex-program is not invoked. - * - * This function iterates until there are no remaining messages, or - * for `maxIterations` iterations. - * - * @tparam VD the vertex data type - * @tparam ED the edge data type - * @tparam A the Pregel message type - * - * @param graph the input graph. - * - * @param initialMsg the message each vertex will receive at the on - * the first iteration - * - * @param maxIterations the maximum number of iterations to run for - * - * @param activeDirection the direction of edges incident to a vertex that received a message in - * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only - * out-edges of vertices that received a message in the previous round will run. The default is - * `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message - * in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where - * *both* vertices received a message. - * - * @param vprog the user-defined vertex program which runs on each - * vertex and receives the inbound message and computes a new vertex - * value. On the first iteration the vertex program is invoked on - * all vertices and is passed the default message. On subsequent - * iterations the vertex program is only invoked on those vertices - * that receive messages. - * - * @param sendMsg a user supplied function that is applied to out - * edges of vertices that received messages in the current - * iteration - * - * @param mergeMsg a user supplied function that takes two incoming - * messages of type A and merges them into a single message of type - * A. ''This function must be commutative and associative and - * ideally the size of A should not increase.'' - * - * @return the resulting graph at the end of the computation - * - */ - def run[VD: ClassTag, ED: ClassTag, A: ClassTag] - (graph: Graph[VD, ED], - maxIterations: Int = Int.MaxValue, - activeDirection: EdgeDirection = EdgeDirection.Either) - (vertexProgram: (Int, VertexId, PregelVertex[VD], Option[A]) => PregelVertex[VD], - sendMsg: (Int, EdgeTriplet[PregelVertex[VD], ED]) => Iterator[(VertexId, A)], - mergeMsg: (A, A) => A) - : Graph[VD, ED] = - { - // Initialize the graph with all vertices active - var currengGraph: Graph[PregelVertex[VD], ED] = - graph.mapVertices { (vid, vdata) => PregelVertex(vdata) }.cache() - // Determine the set of vertices that did not vote to halt - var activeVertices = currengGraph.vertices - var numActive = activeVertices.count() - var iteration = 0 - while (numActive > 0 && iteration < maxIterations) { - // get a reference to the current graph to enable unprecistance. - val prevG = currengGraph - - // Compute the messages for all the active vertices - val messages = currengGraph.mapReduceTriplets( t => sendMsg(iteration, t), mergeMsg, - Some((activeVertices, activeDirection))) - - // Receive the messages to the subset of active vertices - currengGraph = currengGraph.outerJoinVertices(messages){ (vid, pVertex, msgOpt) => - // If the vertex voted to halt and received no message then we can skip the vertex program - if (!pVertex.isActive && msgOpt.isEmpty) { - pVertex - } else { - // The vertex program is either active or received a message (or both). - // A vertex program should vote to halt again even if it has previously voted to halt - vertexProgram(iteration, vid, pVertex, msgOpt) - } - }.cache() - - // Recompute the active vertices (those that have not voted to halt) - activeVertices = currengGraph.vertices.filter(v => v._2._2) - - // Force all computation! - numActive = activeVertices.count() - - // Unpersist the RDDs hidden by newly-materialized RDDs - //prevG.unpersistVertices(blocking=false) - //prevG.edges.unpersist(blocking=false) - - //println("Finished Iteration " + i) - // g.vertices.foreach(println(_)) - - logInfo("Pregel finished iteration " + iteration) - // count the iteration - iteration += 1 - } - currengGraph.mapVertices((id, vdata) => vdata.attr) - } // end of apply } // end of class Pregel From e1d8033a488811ae0e01ed9f9c4bb2e6d07d1049 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 28 Aug 2014 11:48:54 -0700 Subject: [PATCH 12/12] fixing typo --- graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index f6b658bd30bc9..5d71f31359bcb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -45,7 +45,7 @@ sealed case class PregelVertex[@specialized T] * vertex properties in terms of the properties of neighboring vertices. These * recursive properties are then computed through iterative fixed-point * computations. For example, the PageRank of a web-page can be defined as a - * weighted some of the PageRank of web-pages that link to that page and is + * weighted sum of the PageRank of web-pages that link to that page and is * computed by iteratively updating the PageRank of each page until a * fixed-point is reached (the PageRank values stop changing). *