Skip to content

Commit

Permalink
adding breaking test to tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jegonzal committed Oct 30, 2014
1 parent 57b7de9 commit 5bb1396
Showing 1 changed file with 71 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ package org.apache.spark.graphx.lib


import org.scalatest.{Matchers, FunSuite}

import collection.mutable.MutableList
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._

object GridPageRank {
def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
import scala.collection.mutable

object TruePageRank {

def grid(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
val inNbrs = Array.fill(nRows * nCols)(mutable.MutableList.empty[Int])
val outDegree = Array.fill(nRows * nCols)(0)
// Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Int = r * nCols + c
Expand All @@ -45,18 +48,36 @@ object GridPageRank {
inNbrs(sub2ind(r,c+1)) += ind
}
}
val vids = 0 until (nRows * nCols)
pr(vids.zip(inNbrs).toMap, vids.zip(outDegree).toMap, nIter, resetProb)
}

def edges(edges: Array[(Int, Int)], nIter: Int, resetProb: Double) = {
val vids = edges.flatMap { case (s,d) => Iterator(s,d) }.toSet
val inNbrs = vids.map(id => (id, mutable.MutableList.empty[Int])).toMap
val outDegree = mutable.Map.empty[Int, Int]
for ((s,d) <- edges) {
outDegree(s) = outDegree.getOrElse(s,0) + 1
inNbrs(d) += s
}
pr(inNbrs, outDegree, nIter, resetProb)
}

def pr(inNbrs: collection.Map[Int, mutable.MutableList[Int]],
outDegree: collection.Map[Int, Int], nIter: Int, resetProb: Double) = {
val nVerts = inNbrs.size
// compute the pagerank
var pr = Array.fill(nRows * nCols)(resetProb)
var pr = Array.fill(nVerts)(resetProb)
for (iter <- 0 until nIter) {
val oldPr = pr
pr = new Array[Double](nRows * nCols)
for (ind <- 0 until (nRows * nCols)) {
pr = new Array[Double](nVerts)
for (ind <- 0 until nVerts) {
pr(ind) = resetProb + (1.0 - resetProb) *
inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
}
}
val normalizer = pr.sum
(0L until (nRows * nCols)).zip(pr.map(p => p / normalizer))
(0L until nVerts).zip(pr.map(p => p / normalizer))
}

}
Expand All @@ -68,7 +89,43 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers {
a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
.map { case (id, error) => error }.sum
}
/*


test("Simple Chain Static PageRank") {
withSpark { sc =>
val chainEdges = Array((0, 1), (1, 2), (2, 3), (4, 3))
val rawEdges = sc.parallelize(chainEdges, 1).map { case (s, d) => (s.toLong, d.toLong)}
val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
val resetProb = 0.15
val tol = 0.0001
val numIter = 10
val truePr = TruePageRank.edges(chainEdges, numIter, resetProb).toMap
val vertices = chain.staticPageRank(numIter, resetProb).vertices.collect
val pageranks = vertices.toMap
for (i <- 0 until 4) {
pageranks(i) should equal(truePr(i) +- 1.0e-7)
}
}
}


test("Simple Chain Dynamic PageRank") {
withSpark { sc =>
val chainEdges = Array((0, 1), (1, 2), (2, 3), (4, 3))
val rawEdges = sc.parallelize(chainEdges, 1).map { case (s, d) => (s.toLong, d.toLong)}
val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
val resetProb = 0.15
val tol = 0.0001
val numIter = 10
val truePr = TruePageRank.edges(chainEdges, numIter, resetProb).toMap
val vertices = chain.pageRank(1.0e-10, resetProb).vertices.collect
val pageranks = vertices.toMap
for (i <- 0 until 4) {
pageranks(i) should equal(truePr(i) +- 1.0e-7)
}
}
}


test("Static Star PageRank") {
withSpark { sc =>
Expand Down Expand Up @@ -137,7 +194,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers {
}
}
}
*/


test("Grid Static PageRank") {
withSpark { sc =>
Expand All @@ -147,7 +204,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers {
val tol = 0.0001
val numIter = 20
val errorTol = 1.0e-5
val truePr = GridPageRank(rows, cols, numIter, resetProb).toMap
val truePr = TruePageRank.grid(rows, cols, numIter, resetProb).toMap
val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
val vertices = gridGraph.staticPageRank(numIter, resetProb).vertices.cache()
val pageranks: Map[VertexId, Double] = vertices.collect().toMap
Expand All @@ -157,7 +214,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers {
}
} // end of Grid PageRank

/*

test("Grid Dynamic PageRank") {
withSpark { sc =>
val rows = 5
Expand All @@ -166,7 +223,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers {
val tol = 0.0001
val numIter = 20
val errorTol = 1.0e-5
val truePr = GridPageRank(rows, cols, numIter, resetProb).toMap
val truePr = TruePageRank.grid(rows, cols, numIter, resetProb).toMap
val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
val vertices = gridGraph.pageRank(1.0e-10, resetProb).vertices.cache()
val pageranks: Map[VertexId, Double] = vertices.collect().toMap
Expand All @@ -176,6 +233,6 @@ class PageRankSuite extends FunSuite with LocalSparkContext with Matchers {
}
}
} // end of Grid PageRank
*/


}

0 comments on commit 5bb1396

Please sign in to comment.