From a9817df9c59919a754f05adda161c840173ccb22 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Thu, 4 Jun 2015 12:22:58 +0530 Subject: [PATCH] Better tests and minor fixes --- python/pyspark/mllib/clustering.py | 16 ++++----- python/pyspark/mllib/tests.py | 55 +++++++++++++++++++++++------- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 8d0c8c7654934..1e148452d2cc2 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -384,17 +384,17 @@ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): raise ValueError( "timeUnit should be 'batches' or 'points', got %s." % timeUnit) self._timeUnit = timeUnit - self.model = None + self.latestModel = None def _validate(self, dstream): - if self.model is None: + if self.latestModel is None: raise ValueError( "Initial centers should be set either by setInitialCenters " "or setRandomCenters.") if not isinstance(dstream, DStream): raise TypeError( "Expected dstream to be of type DStream, " - "got type %d" % type(dstream)) + "got type %s" % type(dstream)) def setK(self, k): """Set number of clusters.""" @@ -416,7 +416,7 @@ def setHalfLife(self, halfLife, timeUnit): return self def setInitialCenters(self, centers, weights): - self.model = StreamingKMeansModel(centers, weights) + self.latestModel = StreamingKMeansModel(centers, weights) return self def setRandomCenters(self, dim, weight, seed): @@ -427,7 +427,7 @@ def setRandomCenters(self, dim, weight, seed): rng = random.RandomState(seed) clusterCenters = rng.randn(self._k, dim) clusterWeights = tile(weight, self._k) - self.model = StreamingKMeansModel(clusterCenters, clusterWeights) + self.latestModel = StreamingKMeansModel(clusterCenters, clusterWeights) return self def trainOn(self, dstream): @@ -435,7 +435,7 @@ def trainOn(self, dstream): self._validate(dstream) def update(rdd): - self.model.update(rdd, self._decayFactor, self._timeUnit) + self.latestModel.update(rdd, self._decayFactor, self._timeUnit) dstream.foreachRDD(update) @@ -445,7 +445,7 @@ def predictOn(self, dstream): Returns a transformed dstream object """ self._validate(dstream) - return dstream.map(self.model.predict) + return dstream.map(self.latestModel.predict) def predictOnValues(self, dstream): """ @@ -453,7 +453,7 @@ def predictOnValues(self, dstream): Returns a transformed dstream object. """ self._validate(dstream) - return dstream.mapValues(self.model.predict) + return dstream.mapValues(self.latestModel.predict) def _test(): diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 5be26e286f00f..aaffeeefaf666 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -25,7 +25,7 @@ import array as pyarray from time import time, sleep -from numpy import array, array_equal, zeros, inf, all +from numpy import array, array_equal, zeros, inf, all, random, sum from py4j.protocol import Py4JJavaError if sys.version_info[:2] <= (2, 6): @@ -883,19 +883,50 @@ def test_model_params(self): self.assertEquals(stkm._decayFactor, 0.0) # Model not set yet. - self.assertIsNone(stkm.model) + self.assertIsNone(stkm.latestModel) self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0]) stkm.setInitialCenters([[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0]) - self.assertEquals(stkm.model.centers, [[0.0, 0.0], [1.0, 1.0]]) - self.assertEquals(stkm.model.getClusterWeights, [1.0, 1.0]) + self.assertEquals(stkm.latestModel.centers, [[0.0, 0.0], [1.0, 1.0]]) + self.assertEquals(stkm.latestModel.getClusterWeights, [1.0, 1.0]) - def _ssc_wait(self, start_time, end_time, sleep_time): + @staticmethod + def _ssc_wait(start_time, end_time, sleep_time): while time() - start_time < end_time: sleep(0.01) + def test_accuracy_for_single_center(self): + numBatches, numPoints, k, d, r, seed = 5, 5, 1, 5, 0.1, 0 + centers, batches = self.streamingKMeansDataGenerator( + numBatches, numPoints, k, d, r, seed) + stkm = StreamingKMeans(1) + stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.]) + input_stream = self.ssc.queueStream( + [self.sc.parallelize(batch, 1) for batch in batches]) + stkm.trainOn(input_stream) + t = time() + self.ssc.start() + self._ssc_wait(t, 10.0, 0.01) + self.assertEquals(stkm.latestModel.getClusterWeights, [25.0]) + realCenters = sum(array(centers), axis=0) + for i in range(d): + modelCenters = stkm.latestModel.centers[0][i] + self.assertAlmostEqual(centers[0][i], modelCenters, 1) + self.assertAlmostEqual(realCenters[i], modelCenters, 1) + + def streamingKMeansDataGenerator(self, batches, numPoints, + k, d, r, seed, centers=None): + rng = random.RandomState(seed) + + # Generate centers. + centers = [rng.randn(d) for i in range(k)] + + return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d)) + for j in range(numPoints)] + for i in range(batches)] + def test_trainOn_model(self): - # Test the model on toy data. + # Test the model on toy data with four clusters. stkm = StreamingKMeans() initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] weights = [1.0, 1.0, 1.0, 1.0] @@ -916,16 +947,15 @@ def test_trainOn_model(self): # Give enough time to train the model. self._ssc_wait(t, 6.0, 0.01) - finalModel = stkm.model + finalModel = stkm.latestModel self.assertTrue(all(finalModel.centers == array(initCenters))) self.assertEquals(finalModel.getClusterWeights, [5.0, 5.0, 5.0, 5.0]) def test_predictOn_model(self): initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] weights = [1.0, 1.0, 1.0, 1.0] - model = StreamingKMeansModel(initCenters, weights) stkm = StreamingKMeans() - stkm.model = model + stkm.latestModel = StreamingKMeansModel(initCenters, weights) predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]] predict_data = [sc.parallelize(batch, 1) for batch in predict_data] @@ -935,10 +965,9 @@ def test_predictOn_model(self): result = [] def update(rdd): - if rdd: - rdd_collect = rdd.collect() - if rdd_collect: - result.append(rdd_collect) + rdd_collect = rdd.collect() + if rdd_collect: + result.append(rdd_collect) predict_val.foreachRDD(update) t = time()