From e407e881ee4f0e6dffe90244b9b3a5308ba94e79 Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Sun, 9 May 2021 13:52:42 +0200 Subject: [PATCH 1/3] overload PageRank.runWithOptions and runWithOptionsWithPreviousPageRank with a 'normalized' parameter to trigger or not the normalization --- .../apache/spark/graphx/lib/PageRank.scala | 71 +++++++++++++++++-- 1 file changed, 65 insertions(+), 6 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 102dc2d2dd4b0..a1a60fc21ee8b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -140,8 +140,31 @@ object PageRank extends Logging { */ def runWithOptions[VD: ClassTag, ED: ClassTag]( graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, - srcId: Option[VertexId] = None): Graph[Double, Double] = - { + srcId: Option[VertexId] = None): Graph[Double, Double] = { + runWithOptions(graph, numIter, resetProb, srcId, normalized = true) + } + + /** + * Run PageRank for a fixed number of iterations returning a graph + * with vertex attributes containing the PageRank and edge + * attributes the normalized edge weight. + * + * @tparam VD the original vertex attribute (not used) + * @tparam ED the original edge attribute (not used) + * + * @param graph the graph on which to compute PageRank + * @param numIter the number of iterations of PageRank to run + * @param resetProb the random reset probability (alpha) + * @param srcId the source vertex for a Personalized Page Rank (optional) + * @param normalized whether or not to normalize rank sum + * + * @return the graph containing with each vertex containing the PageRank and each edge + * containing the normalized weight. + * + */ + def runWithOptions[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], numIter: Int, resetProb: Double, + srcId: Option[VertexId], normalized: Boolean): Graph[Double, Double] = { require(numIter > 0, s"Number of iterations must be greater than 0," + s" but got ${numIter}") require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" + @@ -179,8 +202,13 @@ object PageRank extends Logging { iteration += 1 } - // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks - normalizeRankSum(rankGraph, personalized) + if (normalized) { + // SPARK-18847 If the graph has sinks (vertices with no outgoing edges), + // correct the sum of ranks + normalizeRankSum(rankGraph, personalized) + } else { + rankGraph + } } /** @@ -204,6 +232,32 @@ object PageRank extends Logging { def runWithOptionsWithPreviousPageRank[VD: ClassTag, ED: ClassTag]( graph: Graph[VD, ED], numIter: Int, resetProb: Double, srcId: Option[VertexId], preRankGraph: Graph[Double, Double]): Graph[Double, Double] = { + runWithOptionsWithPreviousPageRank( + graph, numIter, resetProb, srcId, normalized = true, preRankGraph + ) + } + + /** + * Run PageRank for a fixed number of iterations returning a graph + * with vertex attributes containing the PageRank and edge + * attributes the normalized edge weight. + * + * @tparam VD the original vertex attribute (not used) + * @tparam ED the original edge attribute (not used) + * + * @param graph the graph on which to compute PageRank + * @param numIter the number of iterations of PageRank to run + * @param resetProb the random reset probability (alpha) + * @param srcId the source vertex for a Personalized Page Rank (optional) + * @param normalized whether or not to normalize rank sum + * @param preRankGraph PageRank graph from which to keep iterating + * + * @return the graph containing with each vertex containing the PageRank and each edge + * containing the normalized weight. + */ + def runWithOptionsWithPreviousPageRank[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], numIter: Int, resetProb: Double, srcId: Option[VertexId], + normalized: Boolean, preRankGraph: Graph[Double, Double]): Graph[Double, Double] = { require(numIter > 0, s"Number of iterations must be greater than 0," + s" but got ${numIter}") require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" + @@ -238,8 +292,13 @@ object PageRank extends Logging { iteration += 1 } - // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks - normalizeRankSum(rankGraph, personalized) + if (normalized) { + // SPARK-18847 If the graph has sinks (vertices with no outgoing edges), + // correct the sum of ranks + normalizeRankSum(rankGraph, personalized) + } else { + rankGraph + } } /** From 60482b37c33a812a7e2d22db61819a63c055298b Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Sun, 9 May 2021 17:39:51 +0200 Subject: [PATCH 2/3] add ut --- .../spark/graphx/lib/PageRankSuite.scala | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 8008a89c6cd5f..6241ab7b29653 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -233,7 +233,37 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(totalIters == 19) assert(iterAfterHalfCheckPoint == 18) } - } // end of Grid PageRank + } // end of Grid PageRank with checkpoint + + test("Grid PageRank with checkpoint without intermediate normalization") { + withSpark { sc => + // Check that 6 iterations in a row are equivalent + // to 3 times 2 iterations without intermediate normalization + val rows = 10 + val cols = 10 + val resetProb = 0.15 + val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() + + val ranksA: Array[(VertexId, Double)] = PageRank.runWithOptions( + gridGraph, numIter = 6, resetProb, srcId = None, normalized = true + ).vertices.collect() + + val preRankGraph1 = PageRank.runWithOptions( + gridGraph, numIter = 2, resetProb, srcId = None, normalized = false + ) + + val preRankGraph2 = PageRank.runWithOptionsWithPreviousPageRank( + gridGraph, numIter = 2, resetProb, srcId = None, normalized = false, preRankGraph1 + ) + + val ranksB: Array[(VertexId, Double)] = PageRank.runWithOptionsWithPreviousPageRank( + gridGraph, numIter = 2, resetProb, srcId = None, normalized = true, preRankGraph2 + ).vertices.collect() + + // assert that all scores are equal + assert(ranksA.zip(ranksB).forall{ case(rankA, rankB) => rankA == rankB }) + } + } // end of Grid PageRank with checkpoint without intermediate normalization test("Chain PageRank") { withSpark { sc => From 5a5240846b39f143a4d4437566966e207504c7bb Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Tue, 11 May 2021 11:45:33 +0200 Subject: [PATCH 3/3] add @since doc line --- .../src/main/scala/org/apache/spark/graphx/lib/PageRank.scala | 3 +++ .../test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index a1a60fc21ee8b..5dd6f13235ccb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -161,6 +161,7 @@ object PageRank extends Logging { * @return the graph containing with each vertex containing the PageRank and each edge * containing the normalized weight. * + * @since 3.2.0 */ def runWithOptions[VD: ClassTag, ED: ClassTag]( graph: Graph[VD, ED], numIter: Int, resetProb: Double, @@ -254,6 +255,8 @@ object PageRank extends Logging { * * @return the graph containing with each vertex containing the PageRank and each edge * containing the normalized weight. + * + * @since 3.2.0 */ def runWithOptionsWithPreviousPageRank[VD: ClassTag, ED: ClassTag]( graph: Graph[VD, ED], numIter: Int, resetProb: Double, srcId: Option[VertexId], diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 6241ab7b29653..caa2fdcdf5d2b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -261,7 +261,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { ).vertices.collect() // assert that all scores are equal - assert(ranksA.zip(ranksB).forall{ case(rankA, rankB) => rankA == rankB }) + assert(ranksA.zip(ranksB).forall { case(rankA, rankB) => rankA == rankB }) } } // end of Grid PageRank with checkpoint without intermediate normalization