Skip to content

Commit

Permalink
PIC Examples updates from Xiangrui's comments round 5
Browse files Browse the repository at this point in the history
  • Loading branch information
sboeschhuawei committed Feb 11, 2015
1 parent 2878675 commit 3c84b14
Showing 1 changed file with 54 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* An example Power Iteration Clustering app. Takes an input of K concentric circles
* with a total of "n" sampled points (total here means "across ALL of the circles").
* An example Power Iteration Clustering http://www.icml2010.org/papers/387.pdf app.
* Takes an input of K concentric circles and the number of points in the innermost circle.
* The output should be K clusters - each cluster containing precisely the points associated
* with each of the input circles.
*
Expand All @@ -35,17 +35,17 @@ import org.apache.spark.{SparkConf, SparkContext}
* ./bin/run-example mllib.PowerIterationClusteringExample [options]
*
* Where options include:
* k: Number of circles/ clusters
* k: Number of circles/clusters
* n: Number of sampled points on innermost circle.. There are proportionally more points
* within the outer/larger circles
* numIterations: Number of Power Iterations
* maxIterations: Number of Power Iterations
* outerRadius: radius of the outermost of the concentric circles
* }}}
*
* Here is a sample run and output:
*
* ./bin/run-example mllib.PowerIterationClusteringExample
* -k 3 --n 30 --numIterations 15
* -k 3 --n 30 --maxIterations 15
*
* Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
* 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
Expand All @@ -55,20 +55,31 @@ import org.apache.spark.{SparkConf, SparkContext}
*/
object PowerIterationClusteringExample {

case class Params(
input: String = null,
k: Int = 3,
numPoints: Int = 5,
maxIterations: Int = 10,
outerRadius: Double = 3.0
) extends AbstractParams[Params]

def main(args: Array[String]) {
val defaultParams = Params()

val parser = new OptionParser[Params]("PIC Circles") {
head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
opt[Int]('k', "k")
.text(s"number of circles (/clusters), default: ${defaultParams.k}")
.action((x, c) => c.copy(k = x))
.text(s"number of circles (/clusters), default: ${defaultParams.k}")
.action((x, c) => c.copy(k = x))
opt[Int]('n', "n")
.text(s"number of points, default: ${defaultParams.numPoints}")
.action((x, c) => c.copy(numPoints = x))
opt[Int]("numIterations")
.text(s"number of iterations, default: ${defaultParams.numIterations}")
.action((x, c) => c.copy(numIterations = x))
.text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
.action((x, c) => c.copy(numPoints = x))
opt[Int]("maxIterations")
.text(s"number of iterations, default: ${defaultParams.maxIterations}")
.action((x, c) => c.copy(maxIterations = x))
opt[Int]('r', "r")
.text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
.action((x, c) => c.copy(numPoints = x))
}

parser.parse(args, defaultParams).map { params =>
Expand All @@ -80,70 +91,70 @@ object PowerIterationClusteringExample {

def run(params: Params) {
val conf = new SparkConf()
.setMaster("local")
.setAppName(s"PowerIterationClustering with $params")
.setMaster("local")
.setAppName(s"PowerIterationClustering with $params")
val sc = new SparkContext(conf)

Logger.getRootLogger.setLevel(Level.WARN)

val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.numIterations)
.run(circlesRdd)
.setK(params.k)
.setMaxIterations(params.maxIterations)
.run(circlesRdd)

val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1))
val assignments = clusters.toList.sortBy { case (k, v) => v.length}
val assignmentsStr = assignments
.map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"}.mkString(",")
println(s"Cluster assignments: $assignmentsStr")
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(",")
val sizesStr = assignments.map {
_._2.size
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")

sc.stop()
}

def generateCircle(radius: Double, n: Int) = {
Seq.tabulate(n) { i =>
val theta = 2.0 * math.Pi * i / n
(radius * math.cos(theta), radius * math.sin(theta))
}
}

def generateCirclesRdd(sc: SparkContext,
nCircles: Int = 3,
nPoints: Int = 30,
outerRadius: Double): RDD[(Long, Long, Double)] = {

val radii = for (cx <- 0 until nCircles) yield outerRadius / (nCircles-cx)
val groupSizes = for (cx <- 0 until nCircles) yield (cx + 1) * nPoints
var ix = 0
val points = for (cx <- 0 until nCircles;
px <- 0 until groupSizes(cx)) yield {
val theta = 2.0 * math.Pi * px / groupSizes(cx)
val out = (ix, (radii(cx) * math.cos(theta), radii(cx) * math.sin(theta)))
ix += 1
out
}
val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
val points = (0 until nCircles).flatMap { cx =>
generateCircle(radii(cx), groupSizes(cx))
}.zipWithIndex
val rdd = sc.parallelize(points)
val distancesRdd = rdd.cartesian(rdd).flatMap { case ((i0, (x0, y0)), (i1, (x1, y1))) =>
val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) =>
if (i0 < i1) {
Some((i0.toLong, i1.toLong, similarity((x0, y0), (x1, y1))))
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
} else {
None
}
}
distancesRdd
}

private[mllib] def similarity(p1: (Double, Double), p2: (Double, Double)) = {
gaussianSimilarity(p1, p2, 1.0)
}

/**
* Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel
*/
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = {
math.exp((p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2))
val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
coeff * math.exp(expCoeff * ssquares)
// math.exp((p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2))
}

case class Params(
input: String = null,
k: Int = 3,
numPoints: Int = 5,
numIterations: Int = 10,
outerRadius: Double = 3.0
) extends AbstractParams[Params]

}

0 comments on commit 3c84b14

Please sign in to comment.