Skip to content

Commit

Permalink
Better tests and minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MechCoder committed Jun 18, 2015
1 parent c80e451 commit a9817df
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 21 deletions.
16 changes: 8 additions & 8 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,17 +384,17 @@ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"):
raise ValueError(
"timeUnit should be 'batches' or 'points', got %s." % timeUnit)
self._timeUnit = timeUnit
self.model = None
self.latestModel = None

def _validate(self, dstream):
if self.model is None:
if self.latestModel is None:
raise ValueError(
"Initial centers should be set either by setInitialCenters "
"or setRandomCenters.")
if not isinstance(dstream, DStream):
raise TypeError(
"Expected dstream to be of type DStream, "
"got type %d" % type(dstream))
"got type %s" % type(dstream))

def setK(self, k):
"""Set number of clusters."""
Expand All @@ -416,7 +416,7 @@ def setHalfLife(self, halfLife, timeUnit):
return self

def setInitialCenters(self, centers, weights):
self.model = StreamingKMeansModel(centers, weights)
self.latestModel = StreamingKMeansModel(centers, weights)
return self

def setRandomCenters(self, dim, weight, seed):
Expand All @@ -427,15 +427,15 @@ def setRandomCenters(self, dim, weight, seed):
rng = random.RandomState(seed)
clusterCenters = rng.randn(self._k, dim)
clusterWeights = tile(weight, self._k)
self.model = StreamingKMeansModel(clusterCenters, clusterWeights)
self.latestModel = StreamingKMeansModel(clusterCenters, clusterWeights)
return self

def trainOn(self, dstream):
"""Train the model on the incoming dstream."""
self._validate(dstream)

def update(rdd):
self.model.update(rdd, self._decayFactor, self._timeUnit)
self.latestModel.update(rdd, self._decayFactor, self._timeUnit)

dstream.foreachRDD(update)

Expand All @@ -445,15 +445,15 @@ def predictOn(self, dstream):
Returns a transformed dstream object
"""
self._validate(dstream)
return dstream.map(self.model.predict)
return dstream.map(self.latestModel.predict)

def predictOnValues(self, dstream):
"""
Make predictions on a keyed dstream.
Returns a transformed dstream object.
"""
self._validate(dstream)
return dstream.mapValues(self.model.predict)
return dstream.mapValues(self.latestModel.predict)


def _test():
Expand Down
55 changes: 42 additions & 13 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import array as pyarray
from time import time, sleep

from numpy import array, array_equal, zeros, inf, all
from numpy import array, array_equal, zeros, inf, all, random, sum
from py4j.protocol import Py4JJavaError

if sys.version_info[:2] <= (2, 6):
Expand Down Expand Up @@ -883,19 +883,50 @@ def test_model_params(self):
self.assertEquals(stkm._decayFactor, 0.0)

# Model not set yet.
self.assertIsNone(stkm.model)
self.assertIsNone(stkm.latestModel)
self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0])

stkm.setInitialCenters([[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0])
self.assertEquals(stkm.model.centers, [[0.0, 0.0], [1.0, 1.0]])
self.assertEquals(stkm.model.getClusterWeights, [1.0, 1.0])
self.assertEquals(stkm.latestModel.centers, [[0.0, 0.0], [1.0, 1.0]])
self.assertEquals(stkm.latestModel.getClusterWeights, [1.0, 1.0])

def _ssc_wait(self, start_time, end_time, sleep_time):
@staticmethod
def _ssc_wait(start_time, end_time, sleep_time):
while time() - start_time < end_time:
sleep(0.01)

def test_accuracy_for_single_center(self):
numBatches, numPoints, k, d, r, seed = 5, 5, 1, 5, 0.1, 0
centers, batches = self.streamingKMeansDataGenerator(
numBatches, numPoints, k, d, r, seed)
stkm = StreamingKMeans(1)
stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.])
input_stream = self.ssc.queueStream(
[self.sc.parallelize(batch, 1) for batch in batches])
stkm.trainOn(input_stream)
t = time()
self.ssc.start()
self._ssc_wait(t, 10.0, 0.01)
self.assertEquals(stkm.latestModel.getClusterWeights, [25.0])
realCenters = sum(array(centers), axis=0)
for i in range(d):
modelCenters = stkm.latestModel.centers[0][i]
self.assertAlmostEqual(centers[0][i], modelCenters, 1)
self.assertAlmostEqual(realCenters[i], modelCenters, 1)

def streamingKMeansDataGenerator(self, batches, numPoints,
k, d, r, seed, centers=None):
rng = random.RandomState(seed)

# Generate centers.
centers = [rng.randn(d) for i in range(k)]

return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d))
for j in range(numPoints)]
for i in range(batches)]

def test_trainOn_model(self):
# Test the model on toy data.
# Test the model on toy data with four clusters.
stkm = StreamingKMeans()
initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
weights = [1.0, 1.0, 1.0, 1.0]
Expand All @@ -916,16 +947,15 @@ def test_trainOn_model(self):

# Give enough time to train the model.
self._ssc_wait(t, 6.0, 0.01)
finalModel = stkm.model
finalModel = stkm.latestModel
self.assertTrue(all(finalModel.centers == array(initCenters)))
self.assertEquals(finalModel.getClusterWeights, [5.0, 5.0, 5.0, 5.0])

def test_predictOn_model(self):
initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
weights = [1.0, 1.0, 1.0, 1.0]
model = StreamingKMeansModel(initCenters, weights)
stkm = StreamingKMeans()
stkm.model = model
stkm.latestModel = StreamingKMeansModel(initCenters, weights)

predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
predict_data = [sc.parallelize(batch, 1) for batch in predict_data]
Expand All @@ -935,10 +965,9 @@ def test_predictOn_model(self):
result = []

def update(rdd):
if rdd:
rdd_collect = rdd.collect()
if rdd_collect:
result.append(rdd_collect)
rdd_collect = rdd.collect()
if rdd_collect:
result.append(rdd_collect)

predict_val.foreachRDD(update)
t = time()
Expand Down

0 comments on commit a9817df

Please sign in to comment.