From 3c84b14096a141944387b1af0c32eee6c51e1a62 Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Wed, 11 Feb 2015 14:37:51 -0800 Subject: [PATCH] PIC Examples updates from Xiangrui's comments round 5 --- .../PowerIterationClusteringExample.scala | 97 +++++++++++-------- 1 file changed, 54 insertions(+), 43 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 52fd2379a42ac..b2373adba1fd4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -25,8 +25,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** - * An example Power Iteration Clustering app. Takes an input of K concentric circles - * with a total of "n" sampled points (total here means "across ALL of the circles"). + * An example Power Iteration Clustering http://www.icml2010.org/papers/387.pdf app. + * Takes an input of K concentric circles and the number of points in the innermost circle. * The output should be K clusters - each cluster containing precisely the points associated * with each of the input circles. * @@ -35,17 +35,17 @@ import org.apache.spark.{SparkConf, SparkContext} * ./bin/run-example mllib.PowerIterationClusteringExample [options] * * Where options include: - * k: Number of circles/ clusters + * k: Number of circles/clusters * n: Number of sampled points on innermost circle.. There are proportionally more points * within the outer/larger circles - * numIterations: Number of Power Iterations + * maxIterations: Number of Power Iterations * outerRadius: radius of the outermost of the concentric circles * }}} * * Here is a sample run and output: * * ./bin/run-example mllib.PowerIterationClusteringExample - * -k 3 --n 30 --numIterations 15 + * -k 3 --n 30 --maxIterations 15 * * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14], * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29] @@ -55,20 +55,31 @@ import org.apache.spark.{SparkConf, SparkContext} */ object PowerIterationClusteringExample { + case class Params( + input: String = null, + k: Int = 3, + numPoints: Int = 5, + maxIterations: Int = 10, + outerRadius: Double = 3.0 + ) extends AbstractParams[Params] + def main(args: Array[String]) { val defaultParams = Params() val parser = new OptionParser[Params]("PIC Circles") { head("PowerIterationClusteringExample: an example PIC app using concentric circles.") opt[Int]('k', "k") - .text(s"number of circles (/clusters), default: ${defaultParams.k}") - .action((x, c) => c.copy(k = x)) + .text(s"number of circles (/clusters), default: ${defaultParams.k}") + .action((x, c) => c.copy(k = x)) opt[Int]('n', "n") - .text(s"number of points, default: ${defaultParams.numPoints}") - .action((x, c) => c.copy(numPoints = x)) - opt[Int]("numIterations") - .text(s"number of iterations, default: ${defaultParams.numIterations}") - .action((x, c) => c.copy(numIterations = x)) + .text(s"number of points in smallest circle, default: ${defaultParams.numPoints}") + .action((x, c) => c.copy(numPoints = x)) + opt[Int]("maxIterations") + .text(s"number of iterations, default: ${defaultParams.maxIterations}") + .action((x, c) => c.copy(maxIterations = x)) + opt[Int]('r', "r") + .text(s"radius of outermost circle, default: ${defaultParams.outerRadius}") + .action((x, c) => c.copy(numPoints = x)) } parser.parse(args, defaultParams).map { params => @@ -80,46 +91,53 @@ object PowerIterationClusteringExample { def run(params: Params) { val conf = new SparkConf() - .setMaster("local") - .setAppName(s"PowerIterationClustering with $params") + .setMaster("local") + .setAppName(s"PowerIterationClustering with $params") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius) val model = new PowerIterationClustering() - .setK(params.k) - .setMaxIterations(params.numIterations) - .run(circlesRdd) + .setK(params.k) + .setMaxIterations(params.maxIterations) + .run(circlesRdd) val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1)) val assignments = clusters.toList.sortBy { case (k, v) => v.length} val assignmentsStr = assignments - .map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"}.mkString(",") - println(s"Cluster assignments: $assignmentsStr") + .map { case (k, v) => + s"$k -> ${v.sorted.mkString("[", ",", "]")}" + }.mkString(",") + val sizesStr = assignments.map { + _._2.size + }.sorted.mkString("(", ",", ")") + println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr") sc.stop() } + def generateCircle(radius: Double, n: Int) = { + Seq.tabulate(n) { i => + val theta = 2.0 * math.Pi * i / n + (radius * math.cos(theta), radius * math.sin(theta)) + } + } + def generateCirclesRdd(sc: SparkContext, nCircles: Int = 3, nPoints: Int = 30, outerRadius: Double): RDD[(Long, Long, Double)] = { - val radii = for (cx <- 0 until nCircles) yield outerRadius / (nCircles-cx) - val groupSizes = for (cx <- 0 until nCircles) yield (cx + 1) * nPoints - var ix = 0 - val points = for (cx <- 0 until nCircles; - px <- 0 until groupSizes(cx)) yield { - val theta = 2.0 * math.Pi * px / groupSizes(cx) - val out = (ix, (radii(cx) * math.cos(theta), radii(cx) * math.sin(theta))) - ix += 1 - out - } + val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)} + val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints} + val points = (0 until nCircles).flatMap { cx => + generateCircle(radii(cx), groupSizes(cx)) + }.zipWithIndex val rdd = sc.parallelize(points) - val distancesRdd = rdd.cartesian(rdd).flatMap { case ((i0, (x0, y0)), (i1, (x1, y1))) => + val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) => if (i0 < i1) { - Some((i0.toLong, i1.toLong, similarity((x0, y0), (x1, y1)))) + Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0))) } else { None } @@ -127,23 +145,16 @@ object PowerIterationClusteringExample { distancesRdd } - private[mllib] def similarity(p1: (Double, Double), p2: (Double, Double)) = { - gaussianSimilarity(p1, p2, 1.0) - } - /** * Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel */ def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = { - math.exp((p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)) + val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma) + val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0) + val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2) + coeff * math.exp(expCoeff * ssquares) + // math.exp((p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)) } - case class Params( - input: String = null, - k: Int = 3, - numPoints: Int = 5, - numIterations: Int = 10, - outerRadius: Double = 3.0 - ) extends AbstractParams[Params] }