From 5864d4ada20f7b087e56741d0f39a36844f3a073 Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Tue, 3 Feb 2015 13:11:08 -0800 Subject: [PATCH 01/10] placeholder for pic examples --- .../PowerIterationClusteringExample.scala | 109 ++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala new file mode 100644 index 0000000000000..7e7b2e26053bc --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.clustering.KMeans +import org.apache.spark.mllib.linalg.Vectors + +/** + * An example k-means app. Run with + * {{{ + * ./bin/run-example org.apache.spark.examples.mllib.DenseKMeans [options] + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object PowerIterationClusteringExample { + + object InitializationMode extends Enumeration { + type InitializationMode = Value + val Random, Parallel = Value + } + + import InitializationMode._ + + case class Params( + input: String = null, + k: Int = -1, + numIterations: Int = 10, + initializationMode: InitializationMode = Parallel) extends AbstractParams[Params] + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("DenseKMeans") { + head("DenseKMeans: an example k-means app for dense data.") + opt[Int]('k', "k") + .required() + .text(s"number of clusters, required") + .action((x, c) => c.copy(k = x)) + opt[Int]("numIterations") + .text(s"number of iterations, default; ${defaultParams.numIterations}") + .action((x, c) => c.copy(numIterations = x)) + opt[String]("initMode") + .text(s"initialization mode (${InitializationMode.values.mkString(",")}), " + + s"default: ${defaultParams.initializationMode}") + .action((x, c) => c.copy(initializationMode = InitializationMode.withName(x))) + arg[String]("") + .text("input paths to examples") + .required() + .action((x, c) => c.copy(input = x)) + } + + parser.parse(args, defaultParams).map { params => + run(params) + }.getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"DenseKMeans with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val examples = sc.textFile(params.input).map { line => + Vectors.dense(line.split(' ').map(_.toDouble)) + }.cache() + + val numExamples = examples.count() + + println(s"numExamples = $numExamples.") + + val initMode = params.initializationMode match { + case Random => KMeans.RANDOM + case Parallel => KMeans.K_MEANS_PARALLEL + } + + val model = new KMeans() + .setInitializationMode(initMode) + .setK(params.k) + .setMaxIterations(params.numIterations) + .run(examples) + + val cost = model.computeCost(examples) + + println(s"Total cost = $cost.") + + sc.stop() + } +} From c50913012654372da9b0b6a4d1b7e4e410f9afda Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Tue, 3 Feb 2015 13:14:36 -0800 Subject: [PATCH 02/10] placeholder for pic examples --- .../examples/mllib/PowerIterationClusteringExample.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 7e7b2e26053bc..194a6a1827847 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -18,11 +18,10 @@ package org.apache.spark.examples.mllib import org.apache.log4j.{Level, Logger} -import scopt.OptionParser - -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.{SparkConf, SparkContext} +import scopt.OptionParser /** * An example k-means app. Run with @@ -38,7 +37,7 @@ object PowerIterationClusteringExample { val Random, Parallel = Value } - import InitializationMode._ + import org.apache.spark.examples.mllib.PowerIterationClusteringExample.InitializationMode._ case class Params( input: String = null, From 03e8de460da07a6451bd20493dc4653c20cbe952 Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Mon, 9 Feb 2015 16:57:10 -0800 Subject: [PATCH 03/10] Added PICExample --- .../PowerIterationClusteringExample.scala | 139 ++++++++++++------ 1 file changed, 96 insertions(+), 43 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 194a6a1827847..aed0c5d583bca 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -18,53 +18,44 @@ package org.apache.spark.examples.mllib import org.apache.log4j.{Level, Logger} -import org.apache.spark.mllib.clustering.KMeans +import org.apache.spark.mllib.clustering.PowerIterationClustering import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scopt.OptionParser /** - * An example k-means app. Run with + * An example Power Iteration Clustering app. Run with * {{{ - * ./bin/run-example org.apache.spark.examples.mllib.DenseKMeans [options] + * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample + * [options] * }}} * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ object PowerIterationClusteringExample { - object InitializationMode extends Enumeration { - type InitializationMode = Value - val Random, Parallel = Value - } - - import org.apache.spark.examples.mllib.PowerIterationClusteringExample.InitializationMode._ - case class Params( - input: String = null, - k: Int = -1, - numIterations: Int = 10, - initializationMode: InitializationMode = Parallel) extends AbstractParams[Params] + input: String = null, + k: Int = 3, + numPoints: Int = 30, + numIterations: Int = 10, + outerRadius: Double = 3.0 + ) extends AbstractParams[Params] def main(args: Array[String]) { val defaultParams = Params() - val parser = new OptionParser[Params]("DenseKMeans") { - head("DenseKMeans: an example k-means app for dense data.") + val parser = new OptionParser[Params]("PIC Circles") { + head("PowerIterationClusteringExample: an example PIC app using concentric circles.") opt[Int]('k', "k") - .required() - .text(s"number of clusters, required") + .text(s"number of circles (/clusters), default: ${defaultParams.k}") .action((x, c) => c.copy(k = x)) + opt[Int]('n', "n") + .text(s"number of points, default: ${defaultParams.numPoints}") + .action((x, c) => c.copy(numPoints = x)) opt[Int]("numIterations") - .text(s"number of iterations, default; ${defaultParams.numIterations}") + .text(s"number of iterations, default: ${defaultParams.numIterations}") .action((x, c) => c.copy(numIterations = x)) - opt[String]("initMode") - .text(s"initialization mode (${InitializationMode.values.mkString(",")}), " + - s"default: ${defaultParams.initializationMode}") - .action((x, c) => c.copy(initializationMode = InitializationMode.withName(x))) - arg[String]("") - .text("input paths to examples") - .required() - .action((x, c) => c.copy(input = x)) } parser.parse(args, defaultParams).map { params => @@ -74,8 +65,78 @@ object PowerIterationClusteringExample { } } + def generateCircle(n: Int, r: Double): Array[(Double, Double)] = { + val pi2 = 2 * math.Pi + (0.0 until pi2 by pi2 / n).map { x => + (r * math.cos(x), r * math.sin(x)) + }.toArray + } + + def generateCirclesRdd(sc: SparkContext, nCircles: Int = 3, nTotalPoints: Int = 30, + outerRadius: Double): + RDD[(Long, Long, Double)] = { + // The circles are generated as follows: + // The Radii are equal to the largestRadius/(C - circleIndex) + // where C=Number of circles + // and the circleIndex is 0 for the innermost and (nCircles-1) for the outermost circle + // The number of points in each circle (and thus in each final cluster) is: + // x, 2x, .., nCircles*x + // Where x is found from x = N * C(C+1)/2 + // The # points in the LAST circle is adjusted downwards so that the total sum is equal + // to the nTotalPoints + + val smallestRad = math.ceil(nTotalPoints / (nCircles * (nCircles + 1) / 2.0)) + var groupSizes = (1 to nCircles).map(gs => (gs * smallestRad).toInt) + groupSizes.zipWithIndex.map { case (gs, ix) => + ix match { + case _ if ix == groupSizes.length => gs - (groupSizes.sum - nTotalPoints) + case _ => gs + } + } + + val radii = for (cx <- 0 until nCircles) yield { + cx match { + case 0 => 0.1 * outerRadius / nCircles + case _ if cx == nCircles - 1 => outerRadius + case _ => outerRadius * cx / (nCircles - 1) + } + } + var ix = 0 + val points = for (cx <- 0 until nCircles; + px <- 0 until groupSizes(cx)) yield { + val theta = 2.0 * math.Pi * px / groupSizes(cx) + val out = (ix, (radii(cx) * math.cos(theta), radii(cx) * math.sin(theta))) + ix += 1 + out + } + val rdd = sc.parallelize(points) + val distancesRdd = rdd.cartesian(rdd).flatMap { case ((i0, (x0, y0)), (i1, (x1, y1))) => + if (i0 < i1) { + val sim = Some((i0.toLong, i1.toLong, similarity((x0, y0), (x1, y1)))) + sim + } else { + None + } + } + val coll = distancesRdd.collect + distancesRdd + } + + def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = { + val sim = (1.0 / + (math.sqrt(2.0 * math.Pi) * sigma)) * math.exp((-1.0 / (2.0 * math.pow(sigma, 2.0)) + * (math.pow(p1._1 - p2._1, 2) + math.pow(p1._2 - p2._2, 2)))) + sim + } + + private[mllib] def similarity(p1: (Double, Double), p2: (Double, Double)) = { + gaussianSimilarity(p1, p2, 1.0) + } + def run(params: Params) { - val conf = new SparkConf().setAppName(s"DenseKMeans with $params") + val conf = new SparkConf() + .setMaster("local") + .setAppName(s"PowerIterationClustering with $params") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) @@ -84,24 +145,16 @@ object PowerIterationClusteringExample { Vectors.dense(line.split(' ').map(_.toDouble)) }.cache() - val numExamples = examples.count() - - println(s"numExamples = $numExamples.") - - val initMode = params.initializationMode match { - case Random => KMeans.RANDOM - case Parallel => KMeans.K_MEANS_PARALLEL - } - - val model = new KMeans() - .setInitializationMode(initMode) + val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius) + val model = new PowerIterationClustering() .setK(params.k) .setMaxIterations(params.numIterations) - .run(examples) - - val cost = model.computeCost(examples) + .run(circlesRdd) - println(s"Total cost = $cost.") + val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1)) + println(s"Cluster assignments: " + + s"${clusters.map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"} + .mkString(",")}") sc.stop() } From efeec458cb08b39bee4f026a2e5233f3bfc2d0c2 Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Mon, 9 Feb 2015 22:24:48 -0800 Subject: [PATCH 04/10] Update to PICExample from Xiangrui's comments --- .../PowerIterationClusteringExample.scala | 55 ++++++++++++------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index aed0c5d583bca..1acabda19bc69 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -19,28 +19,47 @@ package org.apache.spark.examples.mllib import org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.clustering.PowerIterationClustering -import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scopt.OptionParser /** - * An example Power Iteration Clustering app. Run with + * An example Power Iteration Clustering app. Takes an input of K concentric circles + * with a total of "n" sampled points (total here means "across ALL of the circles"). + * The output should be K clusters - each cluster containing precisely the points associated + * with each of the input circles. + * + * Run with * {{{ - * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample - * [options] + * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample [options] + * + * Where options include: + * k: Number of circles/ clusters + * n: Total number of sampled points. There are proportionally more points within the + * outer/larger circles + * numIterations: Number of Power Iterations + * outerRadius: radius of the outermost of the concentric circles * }}} + * + * Here is a sample run and output: + * + * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample -k 3 --n 30 --numIterations 15 + * + * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14], + * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29] + * + * * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ object PowerIterationClusteringExample { case class Params( - input: String = null, - k: Int = 3, - numPoints: Int = 30, - numIterations: Int = 10, - outerRadius: Double = 3.0 - ) extends AbstractParams[Params] + input: String = null, + k: Int = 3, + numPoints: Int = 30, + numIterations: Int = 10, + outerRadius: Double = 3.0 + ) extends AbstractParams[Params] def main(args: Array[String]) { val defaultParams = Params() @@ -112,21 +131,21 @@ object PowerIterationClusteringExample { val rdd = sc.parallelize(points) val distancesRdd = rdd.cartesian(rdd).flatMap { case ((i0, (x0, y0)), (i1, (x1, y1))) => if (i0 < i1) { - val sim = Some((i0.toLong, i1.toLong, similarity((x0, y0), (x1, y1)))) - sim + Some((i0.toLong, i1.toLong, similarity((x0, y0), (x1, y1)))) } else { None } } - val coll = distancesRdd.collect distancesRdd } + /** + * Gaussian Similarity: http://www.stat.wisc.edu/~mchung/teaching/MIA/reading/diffusion.gaussian.kernel.pdf + */ def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = { - val sim = (1.0 / + (1.0 / (math.sqrt(2.0 * math.Pi) * sigma)) * math.exp((-1.0 / (2.0 * math.pow(sigma, 2.0)) * (math.pow(p1._1 - p2._1, 2) + math.pow(p1._2 - p2._2, 2)))) - sim } private[mllib] def similarity(p1: (Double, Double), p2: (Double, Double)) = { @@ -141,10 +160,6 @@ object PowerIterationClusteringExample { Logger.getRootLogger.setLevel(Level.WARN) - val examples = sc.textFile(params.input).map { line => - Vectors.dense(line.split(' ').map(_.toDouble)) - }.cache() - val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius) val model = new PowerIterationClustering() .setK(params.k) @@ -153,7 +168,7 @@ object PowerIterationClusteringExample { val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1)) println(s"Cluster assignments: " - + s"${clusters.map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"} + + s"${clusters.toList.sortBy{ case (k,v) => v.length}.map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"} .mkString(",")}") sc.stop() From f7ff43d94d3a3ac02f2aa3c9521990a96e28d252 Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Mon, 9 Feb 2015 22:45:32 -0800 Subject: [PATCH 05/10] Update to PICExample from Xiangrui's comments --- .../examples/mllib/PowerIterationClusteringExample.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 1acabda19bc69..06c7c80720b6a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -43,7 +43,8 @@ import scopt.OptionParser * * Here is a sample run and output: * - * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample -k 3 --n 30 --numIterations 15 + * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample + * -k 3 --n 30 --numIterations 15 * * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14], * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29] @@ -140,7 +141,8 @@ object PowerIterationClusteringExample { } /** - * Gaussian Similarity: http://www.stat.wisc.edu/~mchung/teaching/MIA/reading/diffusion.gaussian.kernel.pdf + * Gaussian Similarity: + * http://www.stat.wisc.edu/~mchung/teaching/MIA/reading/diffusion.gaussian.kernel.pdf */ def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = { (1.0 / @@ -168,7 +170,8 @@ object PowerIterationClusteringExample { val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1)) println(s"Cluster assignments: " - + s"${clusters.toList.sortBy{ case (k,v) => v.length}.map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"} + + s"${clusters.toList.sortBy{ case (k,v) => v.length} + .map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"} .mkString(",")}") sc.stop() From cef28f4b88ba1ab8dc08bac174a234bdcbaf42b6 Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Mon, 9 Feb 2015 23:18:04 -0800 Subject: [PATCH 06/10] Further updates to PICExample from Xiangrui's comments --- .../mllib/PowerIterationClusteringExample.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 06c7c80720b6a..72391a405b325 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -141,13 +141,10 @@ object PowerIterationClusteringExample { } /** - * Gaussian Similarity: - * http://www.stat.wisc.edu/~mchung/teaching/MIA/reading/diffusion.gaussian.kernel.pdf + * Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel */ def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = { - (1.0 / - (math.sqrt(2.0 * math.Pi) * sigma)) * math.exp((-1.0 / (2.0 * math.pow(sigma, 2.0)) - * (math.pow(p1._1 - p2._1, 2) + math.pow(p1._2 - p2._2, 2)))) + math.exp((p1._1 - p2._1)*(p1._1 - p2._1) + (p1._2 - p2._2)*(p1._2 - p2._2)) } private[mllib] def similarity(p1: (Double, Double), p2: (Double, Double)) = { @@ -169,10 +166,10 @@ object PowerIterationClusteringExample { .run(circlesRdd) val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1)) - println(s"Cluster assignments: " - + s"${clusters.toList.sortBy{ case (k,v) => v.length} - .map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"} - .mkString(",")}") + val assignments =clusters.toList.sortBy{ case (k,v) => v.length} + val assignmentsStr = assignments + .map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"}.mkString(",") + println(s"Cluster assignments: $assignmentsStr") sc.stop() } From d7f0cbadcdc8c217f8ebe2be86048b9d9c24bfca Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Tue, 10 Feb 2015 12:12:46 -0800 Subject: [PATCH 07/10] Updates to PICExample from Xiangrui's comments round 3 --- .../PowerIterationClusteringExample.scala | 125 ++++++++---------- 1 file changed, 52 insertions(+), 73 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 72391a405b325..ac2bb7caec0d1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -18,10 +18,11 @@ package org.apache.spark.examples.mllib import org.apache.log4j.{Level, Logger} +import scopt.OptionParser + import org.apache.spark.mllib.clustering.PowerIterationClustering import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} -import scopt.OptionParser /** * An example Power Iteration Clustering app. Takes an input of K concentric circles @@ -31,51 +32,43 @@ import scopt.OptionParser * * Run with * {{{ - * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample [options] + * ./bin/run-example mllib.PowerIterationClusteringExample [options] * * Where options include: * k: Number of circles/ clusters - * n: Total number of sampled points. There are proportionally more points within the - * outer/larger circles + * n: Number of sampled points on innermost circle.. There are proportionally more points + * within the outer/larger circles * numIterations: Number of Power Iterations * outerRadius: radius of the outermost of the concentric circles * }}} * * Here is a sample run and output: * - * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample - * -k 3 --n 30 --numIterations 15 + * ./bin/run-example mllib.PowerIterationClusteringExample + * -k 3 --n 30 --numIterations 15 * * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14], - * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29] + * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29] * * * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ object PowerIterationClusteringExample { - case class Params( - input: String = null, - k: Int = 3, - numPoints: Int = 30, - numIterations: Int = 10, - outerRadius: Double = 3.0 - ) extends AbstractParams[Params] - def main(args: Array[String]) { val defaultParams = Params() val parser = new OptionParser[Params]("PIC Circles") { head("PowerIterationClusteringExample: an example PIC app using concentric circles.") opt[Int]('k', "k") - .text(s"number of circles (/clusters), default: ${defaultParams.k}") - .action((x, c) => c.copy(k = x)) + .text(s"number of circles (/clusters), default: ${defaultParams.k}") + .action((x, c) => c.copy(k = x)) opt[Int]('n', "n") - .text(s"number of points, default: ${defaultParams.numPoints}") - .action((x, c) => c.copy(numPoints = x)) + .text(s"number of points, default: ${defaultParams.numPoints}") + .action((x, c) => c.copy(numPoints = x)) opt[Int]("numIterations") - .text(s"number of iterations, default: ${defaultParams.numIterations}") - .action((x, c) => c.copy(numIterations = x)) + .text(s"number of iterations, default: ${defaultParams.numIterations}") + .action((x, c) => c.copy(numIterations = x)) } parser.parse(args, defaultParams).map { params => @@ -85,34 +78,33 @@ object PowerIterationClusteringExample { } } - def generateCircle(n: Int, r: Double): Array[(Double, Double)] = { - val pi2 = 2 * math.Pi - (0.0 until pi2 by pi2 / n).map { x => - (r * math.cos(x), r * math.sin(x)) - }.toArray + def run(params: Params) { + val conf = new SparkConf() + .setMaster("local") + .setAppName(s"PowerIterationClustering with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius) + val model = new PowerIterationClustering() + .setK(params.k) + .setMaxIterations(params.numIterations) + .run(circlesRdd) + + val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1)) + val assignments = clusters.toList.sortBy { case (k, v) => v.length} + val assignmentsStr = assignments + .map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"}.mkString(",") + println(s"Cluster assignments: $assignmentsStr") + + sc.stop() } - def generateCirclesRdd(sc: SparkContext, nCircles: Int = 3, nTotalPoints: Int = 30, - outerRadius: Double): - RDD[(Long, Long, Double)] = { - // The circles are generated as follows: - // The Radii are equal to the largestRadius/(C - circleIndex) - // where C=Number of circles - // and the circleIndex is 0 for the innermost and (nCircles-1) for the outermost circle - // The number of points in each circle (and thus in each final cluster) is: - // x, 2x, .., nCircles*x - // Where x is found from x = N * C(C+1)/2 - // The # points in the LAST circle is adjusted downwards so that the total sum is equal - // to the nTotalPoints - - val smallestRad = math.ceil(nTotalPoints / (nCircles * (nCircles + 1) / 2.0)) - var groupSizes = (1 to nCircles).map(gs => (gs * smallestRad).toInt) - groupSizes.zipWithIndex.map { case (gs, ix) => - ix match { - case _ if ix == groupSizes.length => gs - (groupSizes.sum - nTotalPoints) - case _ => gs - } - } + def generateCirclesRdd(sc: SparkContext, + nCircles: Int = 3, + nPoints: Int = 30, + outerRadius: Double): RDD[(Long, Long, Double)] = { val radii = for (cx <- 0 until nCircles) yield { cx match { @@ -121,6 +113,7 @@ object PowerIterationClusteringExample { case _ => outerRadius * cx / (nCircles - 1) } } + val groupSizes = for (cx <- 0 until nCircles) yield (cx+1) * nPoints var ix = 0 val points = for (cx <- 0 until nCircles; px <- 0 until groupSizes(cx)) yield { @@ -140,37 +133,23 @@ object PowerIterationClusteringExample { distancesRdd } + private[mllib] def similarity(p1: (Double, Double), p2: (Double, Double)) = { + gaussianSimilarity(p1, p2, 1.0) + } + /** * Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel */ def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = { - math.exp((p1._1 - p2._1)*(p1._1 - p2._1) + (p1._2 - p2._2)*(p1._2 - p2._2)) - } - - private[mllib] def similarity(p1: (Double, Double), p2: (Double, Double)) = { - gaussianSimilarity(p1, p2, 1.0) + math.exp((p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)) } - def run(params: Params) { - val conf = new SparkConf() - .setMaster("local") - .setAppName(s"PowerIterationClustering with $params") - val sc = new SparkContext(conf) - - Logger.getRootLogger.setLevel(Level.WARN) - - val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius) - val model = new PowerIterationClustering() - .setK(params.k) - .setMaxIterations(params.numIterations) - .run(circlesRdd) - - val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1)) - val assignments =clusters.toList.sortBy{ case (k,v) => v.length} - val assignmentsStr = assignments - .map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"}.mkString(",") - println(s"Cluster assignments: $assignmentsStr") + case class Params( + input: String = null, + k: Int = 3, + numPoints: Int = 5, + numIterations: Int = 10, + outerRadius: Double = 3.0 + ) extends AbstractParams[Params] - sc.stop() - } } From d7ac350d3927f08ac80fd474d474981841b32df5 Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Tue, 10 Feb 2015 12:15:36 -0800 Subject: [PATCH 08/10] Updates to PICExample from Xiangrui's comments round 3 --- .../spark/examples/mllib/PowerIterationClusteringExample.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index ac2bb7caec0d1..365e4b3aa4b4f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -113,7 +113,7 @@ object PowerIterationClusteringExample { case _ => outerRadius * cx / (nCircles - 1) } } - val groupSizes = for (cx <- 0 until nCircles) yield (cx+1) * nPoints + val groupSizes = for (cx <- 0 until nCircles) yield (cx + 1) * nPoints var ix = 0 val points = for (cx <- 0 until nCircles; px <- 0 until groupSizes(cx)) yield { From 287867513c60709917a2fdbe275f389e1b382d5a Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Tue, 10 Feb 2015 16:00:30 -0800 Subject: [PATCH 09/10] Fourth round with xiangrui on PICExample --- .../examples/mllib/PowerIterationClusteringExample.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 365e4b3aa4b4f..52fd2379a42ac 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -106,13 +106,7 @@ object PowerIterationClusteringExample { nPoints: Int = 30, outerRadius: Double): RDD[(Long, Long, Double)] = { - val radii = for (cx <- 0 until nCircles) yield { - cx match { - case 0 => 0.1 * outerRadius / nCircles - case _ if cx == nCircles - 1 => outerRadius - case _ => outerRadius * cx / (nCircles - 1) - } - } + val radii = for (cx <- 0 until nCircles) yield outerRadius / (nCircles-cx) val groupSizes = for (cx <- 0 until nCircles) yield (cx + 1) * nPoints var ix = 0 val points = for (cx <- 0 until nCircles; From 3c84b14096a141944387b1af0c32eee6c51e1a62 Mon Sep 17 00:00:00 2001 From: sboeschhuawei Date: Wed, 11 Feb 2015 14:37:51 -0800 Subject: [PATCH 10/10] PIC Examples updates from Xiangrui's comments round 5 --- .../PowerIterationClusteringExample.scala | 97 +++++++++++-------- 1 file changed, 54 insertions(+), 43 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 52fd2379a42ac..b2373adba1fd4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -25,8 +25,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** - * An example Power Iteration Clustering app. Takes an input of K concentric circles - * with a total of "n" sampled points (total here means "across ALL of the circles"). + * An example Power Iteration Clustering http://www.icml2010.org/papers/387.pdf app. + * Takes an input of K concentric circles and the number of points in the innermost circle. * The output should be K clusters - each cluster containing precisely the points associated * with each of the input circles. * @@ -35,17 +35,17 @@ import org.apache.spark.{SparkConf, SparkContext} * ./bin/run-example mllib.PowerIterationClusteringExample [options] * * Where options include: - * k: Number of circles/ clusters + * k: Number of circles/clusters * n: Number of sampled points on innermost circle.. There are proportionally more points * within the outer/larger circles - * numIterations: Number of Power Iterations + * maxIterations: Number of Power Iterations * outerRadius: radius of the outermost of the concentric circles * }}} * * Here is a sample run and output: * * ./bin/run-example mllib.PowerIterationClusteringExample - * -k 3 --n 30 --numIterations 15 + * -k 3 --n 30 --maxIterations 15 * * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14], * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29] @@ -55,20 +55,31 @@ import org.apache.spark.{SparkConf, SparkContext} */ object PowerIterationClusteringExample { + case class Params( + input: String = null, + k: Int = 3, + numPoints: Int = 5, + maxIterations: Int = 10, + outerRadius: Double = 3.0 + ) extends AbstractParams[Params] + def main(args: Array[String]) { val defaultParams = Params() val parser = new OptionParser[Params]("PIC Circles") { head("PowerIterationClusteringExample: an example PIC app using concentric circles.") opt[Int]('k', "k") - .text(s"number of circles (/clusters), default: ${defaultParams.k}") - .action((x, c) => c.copy(k = x)) + .text(s"number of circles (/clusters), default: ${defaultParams.k}") + .action((x, c) => c.copy(k = x)) opt[Int]('n', "n") - .text(s"number of points, default: ${defaultParams.numPoints}") - .action((x, c) => c.copy(numPoints = x)) - opt[Int]("numIterations") - .text(s"number of iterations, default: ${defaultParams.numIterations}") - .action((x, c) => c.copy(numIterations = x)) + .text(s"number of points in smallest circle, default: ${defaultParams.numPoints}") + .action((x, c) => c.copy(numPoints = x)) + opt[Int]("maxIterations") + .text(s"number of iterations, default: ${defaultParams.maxIterations}") + .action((x, c) => c.copy(maxIterations = x)) + opt[Int]('r', "r") + .text(s"radius of outermost circle, default: ${defaultParams.outerRadius}") + .action((x, c) => c.copy(numPoints = x)) } parser.parse(args, defaultParams).map { params => @@ -80,46 +91,53 @@ object PowerIterationClusteringExample { def run(params: Params) { val conf = new SparkConf() - .setMaster("local") - .setAppName(s"PowerIterationClustering with $params") + .setMaster("local") + .setAppName(s"PowerIterationClustering with $params") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius) val model = new PowerIterationClustering() - .setK(params.k) - .setMaxIterations(params.numIterations) - .run(circlesRdd) + .setK(params.k) + .setMaxIterations(params.maxIterations) + .run(circlesRdd) val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1)) val assignments = clusters.toList.sortBy { case (k, v) => v.length} val assignmentsStr = assignments - .map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}"}.mkString(",") - println(s"Cluster assignments: $assignmentsStr") + .map { case (k, v) => + s"$k -> ${v.sorted.mkString("[", ",", "]")}" + }.mkString(",") + val sizesStr = assignments.map { + _._2.size + }.sorted.mkString("(", ",", ")") + println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr") sc.stop() } + def generateCircle(radius: Double, n: Int) = { + Seq.tabulate(n) { i => + val theta = 2.0 * math.Pi * i / n + (radius * math.cos(theta), radius * math.sin(theta)) + } + } + def generateCirclesRdd(sc: SparkContext, nCircles: Int = 3, nPoints: Int = 30, outerRadius: Double): RDD[(Long, Long, Double)] = { - val radii = for (cx <- 0 until nCircles) yield outerRadius / (nCircles-cx) - val groupSizes = for (cx <- 0 until nCircles) yield (cx + 1) * nPoints - var ix = 0 - val points = for (cx <- 0 until nCircles; - px <- 0 until groupSizes(cx)) yield { - val theta = 2.0 * math.Pi * px / groupSizes(cx) - val out = (ix, (radii(cx) * math.cos(theta), radii(cx) * math.sin(theta))) - ix += 1 - out - } + val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)} + val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints} + val points = (0 until nCircles).flatMap { cx => + generateCircle(radii(cx), groupSizes(cx)) + }.zipWithIndex val rdd = sc.parallelize(points) - val distancesRdd = rdd.cartesian(rdd).flatMap { case ((i0, (x0, y0)), (i1, (x1, y1))) => + val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) => if (i0 < i1) { - Some((i0.toLong, i1.toLong, similarity((x0, y0), (x1, y1)))) + Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0))) } else { None } @@ -127,23 +145,16 @@ object PowerIterationClusteringExample { distancesRdd } - private[mllib] def similarity(p1: (Double, Double), p2: (Double, Double)) = { - gaussianSimilarity(p1, p2, 1.0) - } - /** * Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel */ def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = { - math.exp((p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)) + val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma) + val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0) + val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2) + coeff * math.exp(expCoeff * ssquares) + // math.exp((p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)) } - case class Params( - input: String = null, - k: Int = 3, - numPoints: Int = 5, - numIterations: Int = 10, - outerRadius: Double = 3.0 - ) extends AbstractParams[Params] }