Skip to content

Commit

Permalink
Update tests, doc and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
MechCoder committed Jun 18, 2015
1 parent 4b1481f commit ee8ce16
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 24 deletions.
56 changes: 52 additions & 4 deletions docs/mllib-clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -593,14 +593,62 @@ ssc.start()
ssc.awaitTermination()

{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
First we import the neccessary classes.

{% highlight python %}

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans

{% endhighlight %}

Then we make an input stream of vectors for training, as well as a stream of labeled data
points for testing. We assume a StreamingContext `ssc` has been created, see
[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.

{% highlight python %}

trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)

{% endhighlight %}

We create a model with random clusters and specify the number of clusters to find

{% highlight python %}

numDimensions = 3
numClusters = 2
model = StreamingKMeans()
model.setK(numClusters)
model.setDecayFactor(1.0)
model.setRandomCenters(numDimensions, 0.0)

{% endhighlight %}

Now register the streams for training and testing and start the job, printing
the predicted cluster assignments on new data points as they arrive.

{% highlight python %}

model.trainOn(trainingData)
model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features)))

ssc.start()
ssc.awaitTermination()

{% endhighlight %}
</div>

</div>

As you add new text files with data the cluster centers will update. Each training
point should be formatted as `[x1, x2, x3]`, and each test data point
should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or identifier
(e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir`
the model will update. Anytime a text file is placed in `/testing/data/dir`
you will see predictions. With new data, the cluster centers will change!

</div>

</div>
82 changes: 74 additions & 8 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ def predict(self, x):
"""Find the cluster to which x belongs in this model."""
best = 0
best_distance = float("inf")
if isinstance(x, RDD):
return x.map(self.predict)

x = _convert_to_vector(x)
for i in xrange(len(self.centers)):
distance = x.squared_distance(self.centers[i])
Expand Down Expand Up @@ -270,12 +273,30 @@ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initia
class StreamingKMeansModel(KMeansModel):
"""
.. note:: Experimental
Clustering model which can perform an online update of the centroids.
The update formula is given by
c_t+1 = [(c_t * n_t * a) + (x_t * m_t)] / [n_t + m_t]
n_t+1 = n_t * a + m_t
where
c_t: Centroid at the n_th iteration.
n_t: Number of weights at the n_th iteration.
x_t: Centroid of the new data closest to c_t
m_t: Number of weights of the new data closest to c_t
c_t+1: New centroid
n_t+1: New number of weights.
a: Decay Factor, which gives the forgetfulnes
Note that if a is set to 1, it is the weighted mean of the previous
and new data. If it set to zero, the old centroids are completely
forgotten.
>>> initCenters, initWeights = [[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0]
>>> stkm = StreamingKMeansModel(initCenters, initWeights)
>>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1],
... [0.9, 0.9], [1.1, 1.1]])
>>> stkm = stkm.update(data, 1.0, "batches")
>>> stkm = stkm.update(data, 1.0, u"batches")
>>> stkm.centers
array([[ 0., 0.],
[ 1., 1.]])
Expand All @@ -287,7 +308,7 @@ class StreamingKMeansModel(KMeansModel):
[3.0, 3.0]
>>> decayFactor = 0.0
>>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])])
>>> stkm = stkm.update(data, 0.0, "batches")
>>> stkm = stkm.update(data, 0.0, u"batches")
>>> stkm.centers
array([[ 0.2, 0.2],
[ 1.5, 1.5]])
Expand All @@ -304,9 +325,22 @@ def __init__(self, clusterCenters, clusterWeights):

@property
def getClusterWeights(self):
"""Convenience method to return the cluster weights."""
return self._clusterWeights

def update(self, data, decayFactor, timeUnit):
"""Update the centroids, according to data
Parameters
----------
data: Should be a RDD that represents the new data.
decayFactor: forgetfulness of the previous centroids.
timeUnit: Can be "batches" or "points"
If points, then the decay factor is raised to the power of
number of new points and if batches, it is used as it is.
"""
if not isinstance(data, RDD):
raise TypeError("data should be of a RDD, got %s." % type(data))
data = data.map(_convert_to_vector)
Expand All @@ -326,6 +360,21 @@ def update(self, data, decayFactor, timeUnit):
class StreamingKMeans(object):
"""
.. note:: Experimental
Provides methods to set k, decayFactor, timeUnit to train and
predict the incoming data
Parameters
----------
k: int
Number of clusters
decayFactor: float
Forgetfulness of the previous centroid.
timeUnit: str, "batches" or "points"
If points, then the decayfactor is raised to the power of new
points.
"""
def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"):
self._k = k
Expand All @@ -347,14 +396,20 @@ def _validate(self, dstream):
"got type %d" % type(dstream))

def setK(self, k):
"""Set number of clusters."""
self._k = k
return self

def setDecayFactor(self, decayFactor):
"""Set decay factor."""
self._decayFactor = decayFactor
return self

def setHalfLife(self, halfLife, timeUnit):
"""
Set number of instances after which the centroids at
has 0.5 weightage
"""
self._timeUnit = timeUnit
self._decayFactor = exp(log(0.5) / halfLife)
return self
Expand All @@ -364,29 +419,40 @@ def setInitialCenters(self, centers, weights):
return self

def setRandomCenters(self, dim, weight, seed):
"""
Set the initial centres to be random samples from
a gaussian population with constant weights.
"""
rng = random.RandomState(seed)
clusterCenters = rng.randn(self._k, dim)
clusterWeights = tile(weight, self._k)
self.model = StreamingKMeansModel(clusterCenters, clusterWeights)
return self

def trainOn(self, dstream):
"""Train the model on the incoming dstream."""
self._validate(dstream)

def update(_, rdd):
if rdd:
self.model = self.model.update(rdd, self._decayFactor, self._timeUnit)
def update(rdd):
self.model.update(rdd, self._decayFactor, self._timeUnit)

dstream.foreachRDD(update)
return self

def predictOn(self, dstream):
"""
Make predictions on a dstream.
Returns a transformed dstream object
"""
self._validate(dstream)
dstream.map(self.model.predict)
return dstream.map(self.model.predict)

def predictOnValues(self, dstream):
"""
Make predictions on a keyed dstream.
Returns a transformed dstream object.
"""
self._validate(dstream)
dstream.mapValues(self.model.predict)
return dstream.mapValues(self.model.predict)


def _test():
Expand Down
68 changes: 56 additions & 12 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import sys
import tempfile
import array as pyarray
from time import time, sleep

from numpy import array, array_equal, zeros, inf
from numpy import array, array_equal, zeros, inf, all
from py4j.protocol import Py4JJavaError

if sys.version_info[:2] <= (2, 6):
Expand All @@ -38,7 +39,7 @@

from pyspark import SparkContext
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
from pyspark.mllib.regression import LabeledPoint
Expand Down Expand Up @@ -69,6 +70,15 @@ def setUp(self):
self.sc = sc


class MLLibStreamingTestCase(unittest.TestCase):
def setUp(self):
self.sc = sc
self.ssc = StreamingContext(self.sc, 1.0)

def tearDown(self):
self.ssc.stop(False)


def _squared_distance(a, b):
if isinstance(a, Vector):
return a.squared_distance(b)
Expand Down Expand Up @@ -865,7 +875,7 @@ def test_model_transform(self):
eprod.transform(sparsevec), SparseVector(3, [0], [3]))


class StreamingKMeansTest(MLlibTestCase):
class StreamingKMeansTest(MLLibStreamingTestCase):
def test_model_params(self):
stkm = StreamingKMeans()
stkm.setK(5).setDecayFactor(0.0)
Expand All @@ -877,30 +887,64 @@ def test_model_params(self):
self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0])

stkm.setInitialCenters([[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0])
self.assertEqual(stkm.model.centers, [[0.0, 0.0], [1.0, 1.0]])
self.assertEqual(stkm.model.getClusterWeights, [1.0, 1.0])
self.assertEquals(stkm.model.centers, [[0.0, 0.0], [1.0, 1.0]])
self.assertEquals(stkm.model.getClusterWeights, [1.0, 1.0])

def _ssc_wait(self, start_time, end_time, sleep_time):
while time() - start_time < end_time:
sleep(0.01)

def test_model(self):
def test_trainOn_model(self):
# Test the model on toy data.
stkm = StreamingKMeans()
initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
weights = [1.0, 1.0, 1.0, 1.0]
stkm.setInitialCenters(initCenters, weights)

# Create a toy dataset by setting a tiny offest for each point.
offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
batches = []

for offset in offsets:
batches.append([[offset[0] + center[0], offset[1] + center[1]]
for center in initCenters])

batches = [self.sc.parallelize(batch, 1) for batch in batches]
ssc = StreamingContext(self.sc, 2.0)
input_stream = ssc.queueStream(batches)
input_stream = self.ssc.queueStream(batches)
stkm.trainOn(input_stream)
ssc.start()
t = time()
self.ssc.start()

# Give enough time to train the model.
self._ssc_wait(t, 6.0, 0.01)
finalModel = stkm.model
self.assertEqual(finalModel.centers, initCenters)
# self.assertEqual(finalModel.getClusterWeights, [5.0, 5.0, 5.0, 5.0])
self.assertTrue(all(finalModel.centers == array(initCenters)))
self.assertEquals(finalModel.getClusterWeights, [5.0, 5.0, 5.0, 5.0])

def test_predictOn_model(self):
initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
weights = [1.0, 1.0, 1.0, 1.0]
model = StreamingKMeansModel(initCenters, weights)
stkm = StreamingKMeans()
stkm.model = model

predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
predict_data = [sc.parallelize(batch, 1) for batch in predict_data]
predict_stream = self.ssc.queueStream(predict_data)
predict_val = stkm.predictOn(predict_stream)

result = []

def update(rdd):
if rdd:
rdd_collect = rdd.collect()
if rdd_collect:
result.append(rdd_collect)

predict_val.foreachRDD(update)
t = time()
self.ssc.start()
self._ssc_wait(t, 6.0, 0.01)
self.assertEquals(result, [[0], [1], [2], [3]])


if __name__ == "__main__":
Expand Down

0 comments on commit ee8ce16

Please sign in to comment.