Skip to content

Commit

Permalink
[SPARK-4749] [mllib]: Allow initializing KMeans clusters using a seed
Browse files Browse the repository at this point in the history
This implements the functionality for SPARK-4749 and provides units tests in Scala and PySpark

Author: nate.crosswhite <[email protected]>
Author: nxwhite-str <[email protected]>
Author: Xiangrui Meng <[email protected]>

Closes apache#3610 from nxwhite-str/master and squashes the following commits:

a2ebbd3 [nxwhite-str] Merge pull request #1 from mengxr/SPARK-4749-kmeans-seed
7668124 [Xiangrui Meng] minor updates
f8d5928 [nate.crosswhite] Addressing PR issues
277d367 [nate.crosswhite] Merge remote-tracking branch 'upstream/master'
9156a57 [nate.crosswhite] Merge remote-tracking branch 'upstream/master'
5d087b4 [nate.crosswhite] Adding KMeans train with seed and Scala unit test
616d111 [nate.crosswhite] Merge remote-tracking branch 'upstream/master'
35c1884 [nate.crosswhite] Add kmeans initial seed to pyspark API
  • Loading branch information
str-janus authored and mengxr committed Jan 21, 2015
1 parent aa1e22b commit 7450a99
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,16 @@ class PythonMLLibAPI extends Serializable {
k: Int,
maxIterations: Int,
runs: Int,
initializationMode: String): KMeansModel = {
initializationMode: String,
seed: java.lang.Long): KMeansModel = {
val kMeansAlg = new KMeans()
.setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
.setInitializationMode(initializationMode)

if (seed != null) kMeansAlg.setSeed(seed)

try {
kMeansAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package org.apache.spark.mllib.clustering

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.{axpy, scal}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
import org.apache.spark.util.random.XORShiftRandom

/**
Expand All @@ -43,13 +43,14 @@ class KMeans private (
private var runs: Int,
private var initializationMode: String,
private var initializationSteps: Int,
private var epsilon: Double) extends Serializable with Logging {
private var epsilon: Double,
private var seed: Long) extends Serializable with Logging {

/**
* Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1,
* initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4}.
* initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, seed: random}.
*/
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong())

/** Set the number of clusters to create (k). Default: 2. */
def setK(k: Int): this.type = {
Expand Down Expand Up @@ -112,6 +113,12 @@ class KMeans private (
this
}

/** Set the random seed for cluster initialization. */
def setSeed(seed: Long): this.type = {
this.seed = seed
this
}

/**
* Train a K-means model on the given set of points; `data` should be cached for high
* performance, because this is an iterative algorithm.
Expand Down Expand Up @@ -255,7 +262,7 @@ class KMeans private (
private def initRandom(data: RDD[VectorWithNorm])
: Array[Array[VectorWithNorm]] = {
// Sample all the cluster centers in one pass to avoid repeated scans
val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq
val sample = data.takeSample(true, runs * k, new XORShiftRandom(this.seed).nextInt()).toSeq
Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v =>
new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm)
}.toArray)
Expand All @@ -273,7 +280,7 @@ class KMeans private (
private def initKMeansParallel(data: RDD[VectorWithNorm])
: Array[Array[VectorWithNorm]] = {
// Initialize each run's center to a random point
val seed = new XORShiftRandom().nextInt()
val seed = new XORShiftRandom(this.seed).nextInt()
val sample = data.takeSample(true, runs, seed).toSeq
val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense))

Expand Down Expand Up @@ -333,7 +340,32 @@ object KMeans {
/**
* Trains a k-means model using the given set of parameters.
*
* @param data training points stored as `RDD[Array[Double]]`
* @param data training points stored as `RDD[Vector]`
* @param k number of clusters
* @param maxIterations max number of iterations
* @param runs number of parallel runs, defaults to 1. The best model is returned.
* @param initializationMode initialization model, either "random" or "k-means||" (default).
* @param seed random seed value for cluster initialization
*/
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int,
runs: Int,
initializationMode: String,
seed: Long): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
.setInitializationMode(initializationMode)
.setSeed(seed)
.run(data)
}

/**
* Trains a k-means model using the given set of parameters.
*
* @param data training points stored as `RDD[Vector]`
* @param k number of clusters
* @param maxIterations max number of iterations
* @param runs number of parallel runs, defaults to 1. The best model is returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@ class KMeansSuite extends FunSuite with MLlibTestSparkContext {
assert(model.clusterCenters.size === 3)
}

test("deterministic initialization") {
// Create a large-ish set of points for clustering
val points = List.tabulate(1000)(n => Vectors.dense(n, n))
val rdd = sc.parallelize(points, 3)

for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) {
// Create three deterministic models and compare cluster means
val model1 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1,
initializationMode = initMode, seed = 42)
val centers1 = model1.clusterCenters

val model2 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1,
initializationMode = initMode, seed = 42)
val centers2 = model2.clusterCenters

centers1.zip(centers2).foreach { case (c1, c2) =>
assert(c1 ~== c2 absTol 1E-14)
}
}
}

test("single cluster with big dataset") {
val smallData = Array(
Vectors.dense(1.0, 2.0, 6.0),
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ def predict(self, x):
class KMeans(object):

@classmethod
def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"):
def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||", seed=None):
"""Train a k-means clustering model."""
model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
runs, initializationMode)
runs, initializationMode, seed)
centers = callJavaFunc(rdd.context, model.clusterCenters)
return KMeansModel([c.toArray() for c in centers])

Expand Down
17 changes: 16 additions & 1 deletion python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class ListTests(PySparkTestCase):
as NumPy arrays.
"""

def test_clustering(self):
def test_kmeans(self):
from pyspark.mllib.clustering import KMeans
data = [
[0, 1.1],
Expand All @@ -152,6 +152,21 @@ def test_clustering(self):
self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))

def test_kmeans_deterministic(self):
from pyspark.mllib.clustering import KMeans
X = range(0, 100, 10)
Y = range(0, 100, 10)
data = [[x, y] for x, y in zip(X, Y)]
clusters1 = KMeans.train(self.sc.parallelize(data),
3, initializationMode="k-means||", seed=42)
clusters2 = KMeans.train(self.sc.parallelize(data),
3, initializationMode="k-means||", seed=42)
centers1 = clusters1.centers
centers2 = clusters2.centers
for c1, c2 in zip(centers1, centers2):
# TODO: Allow small numeric difference.
self.assertTrue(array_equal(c1, c2))

def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree
Expand Down

0 comments on commit 7450a99

Please sign in to comment.