Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35357][GRAPHX] Allow to turn off the normalization applied by static PageRank utilities #32485

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 68 additions & 6 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,32 @@ object PageRank extends Logging {
*/
def runWithOptions[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double] =
{
srcId: Option[VertexId] = None): Graph[Double, Double] = {
runWithOptions(graph, numIter, resetProb, srcId, normalized = true)
}

/**
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
* attributes the normalized edge weight.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param numIter the number of iterations of PageRank to run
* @param resetProb the random reset probability (alpha)
* @param srcId the source vertex for a Personalized Page Rank (optional)
* @param normalized whether or not to normalize rank sum
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*
ebonnal marked this conversation as resolved.
Show resolved Hide resolved
* @since 3.2.0
*/
def runWithOptions[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double,
srcId: Option[VertexId], normalized: Boolean): Graph[Double, Double] = {
require(numIter > 0, s"Number of iterations must be greater than 0," +
s" but got ${numIter}")
require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" +
Expand Down Expand Up @@ -179,8 +203,13 @@ object PageRank extends Logging {
iteration += 1
}

// SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
normalizeRankSum(rankGraph, personalized)
if (normalized) {
// SPARK-18847 If the graph has sinks (vertices with no outgoing edges),
// correct the sum of ranks
normalizeRankSum(rankGraph, personalized)
} else {
rankGraph
}
}

/**
Expand All @@ -204,6 +233,34 @@ object PageRank extends Logging {
def runWithOptionsWithPreviousPageRank[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double, srcId: Option[VertexId],
preRankGraph: Graph[Double, Double]): Graph[Double, Double] = {
runWithOptionsWithPreviousPageRank(
graph, numIter, resetProb, srcId, normalized = true, preRankGraph
)
}

/**
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
* attributes the normalized edge weight.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param numIter the number of iterations of PageRank to run
* @param resetProb the random reset probability (alpha)
* @param srcId the source vertex for a Personalized Page Rank (optional)
* @param normalized whether or not to normalize rank sum
* @param preRankGraph PageRank graph from which to keep iterating
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*
* @since 3.2.0
*/
def runWithOptionsWithPreviousPageRank[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double, srcId: Option[VertexId],
normalized: Boolean, preRankGraph: Graph[Double, Double]): Graph[Double, Double] = {
require(numIter > 0, s"Number of iterations must be greater than 0," +
s" but got ${numIter}")
require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" +
Expand Down Expand Up @@ -238,8 +295,13 @@ object PageRank extends Logging {
iteration += 1
}

// SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
normalizeRankSum(rankGraph, personalized)
if (normalized) {
// SPARK-18847 If the graph has sinks (vertices with no outgoing edges),
// correct the sum of ranks
normalizeRankSum(rankGraph, personalized)
} else {
rankGraph
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,37 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
assert(totalIters == 19)
assert(iterAfterHalfCheckPoint == 18)
}
} // end of Grid PageRank
} // end of Grid PageRank with checkpoint

test("Grid PageRank with checkpoint without intermediate normalization") {
withSpark { sc =>
// Check that 6 iterations in a row are equivalent
// to 3 times 2 iterations without intermediate normalization
val rows = 10
val cols = 10
val resetProb = 0.15
val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()

val ranksA: Array[(VertexId, Double)] = PageRank.runWithOptions(
gridGraph, numIter = 6, resetProb, srcId = None, normalized = true
).vertices.collect()

val preRankGraph1 = PageRank.runWithOptions(
gridGraph, numIter = 2, resetProb, srcId = None, normalized = false
)

val preRankGraph2 = PageRank.runWithOptionsWithPreviousPageRank(
gridGraph, numIter = 2, resetProb, srcId = None, normalized = false, preRankGraph1
)

val ranksB: Array[(VertexId, Double)] = PageRank.runWithOptionsWithPreviousPageRank(
gridGraph, numIter = 2, resetProb, srcId = None, normalized = true, preRankGraph2
).vertices.collect()

// assert that all scores are equal
assert(ranksA.zip(ranksB).forall { case(rankA, rankB) => rankA == rankB })
}
} // end of Grid PageRank with checkpoint without intermediate normalization

test("Chain PageRank") {
withSpark { sc =>
Expand Down