Skip to content

Commit

Permalink
Update regression, classification and clustering models for sparse data
Browse files Browse the repository at this point in the history
Also updated all the doc tests for it
  • Loading branch information
mateiz committed Apr 15, 2014
1 parent 2abbb44 commit eaee759
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 10 deletions.
27 changes: 26 additions & 1 deletion python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def _get_initial_weights(initial_weights, data):
+ initial_weights.ndim + " dimensions, which is not 1")
initial_weights = numpy.ones([initial_weights.shape[0] - 1])
elif type(initial_weights) == SparseVector:
initial_weights = numpy.ones(initial_weights.size - 1)
initial_weights = numpy.ones([initial_weights.size - 1])
else:
raise TypeError("At least one data element has type "
+ type(initial_weights).__name__ + " which is not a vector")
Expand Down Expand Up @@ -310,6 +310,31 @@ def _serialize_tuple(t):
intpart[0], intpart[1] = t
return ba

def _squared_distance(v1, v2):
"""
Squared distance of two NumPy or sparse vectors.
>>> dense1 = array([1., 2.])
>>> sparse1 = SparseVector(2, [0, 1], [1., 2.])
>>> dense2 = array([2., 1.])
>>> sparse2 = SparseVector(2, [0, 1], [2., 1.])
>>> _squared_distance(dense1, dense2)
2.0
>>> _squared_distance(dense1, sparse2)
2.0
>>> _squared_distance(sparse1, dense2)
2.0
>>> _squared_distance(sparse1, sparse2)
2.0
"""
if type(v1) == ndarray and type(v2) == ndarray:
diff = v1 - v2
return diff.dot(diff)
elif type(v1) == ndarray:
return v2.squared_distance(v1)
else:
return v1.squared_distance(v2)

def _test():
import doctest
globs = globals().copy()
Expand Down
23 changes: 23 additions & 0 deletions python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class LogisticRegressionModel(LinearModel):
True
>>> lrm.predict(SparseVector(2, [1], [1.0])) > 0
True
>>> lrm.predict(SparseVector(2, [1], [0.0])) <= 0
True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
Expand All @@ -75,6 +77,17 @@ class SVMModel(LinearModel):
>>> svm = SVMWithSGD.train(sc.parallelize(data))
>>> svm.predict(array([1.0])) > 0
True
>>> sparse_data = [
... SparseVector(3, [0, 1], [0.0, 0.0]),
... SparseVector(3, [0, 2], [1.0, 1.0]),
... SparseVector(3, [0, 1], [0.0, 0.0]),
... SparseVector(3, [0, 2], [1.0, 2.0])
... ]
>>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
>>> svm.predict(SparseVector(2, [1], [1.0])) > 0
True
>>> svm.predict(SparseVector(2, [1], [0.0])) <= 0
True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
Expand Down Expand Up @@ -106,6 +119,16 @@ class NaiveBayesModel(object):
0.0
>>> model.predict(array([1.0, 0.0]))
1.0
>>> sparse_data = [
... SparseVector(3, [0, 2], [0.0, 1.0]),
... SparseVector(3, [0, 2], [0.0, 2.0]),
... SparseVector(3, [0, 1], [1.0, 1.0])
... ]
>>> model = NaiveBayes.train(sc.parallelize(sparse_data))
>>> model.predict(SparseVector(2, [1], [1.0]))
0.0
>>> model.predict(SparseVector(2, [0], [1.0]))
1.0
"""

def __init__(self, labels, pi, theta):
Expand Down
21 changes: 18 additions & 3 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper
from pyspark.mllib.linalg import SparseVector

class KMeansModel(object):
"""A clustering model derived from the k-means method.
Expand All @@ -34,6 +35,21 @@ class KMeansModel(object):
>>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
True
>>> clusters = KMeans.train(sc.parallelize(data), 2)
>>> sparse_data = [
... SparseVector(3, [0, 1], [0.0, 1.0]),
... SparseVector(3, [0, 1], [0.0, 1.1]),
... SparseVector(3, [0, 2], [0.0, 1.0]),
... SparseVector(3, [0, 2], [0.0, 1.1])
... ]
>>> clusters = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||")
>>> clusters.predict(array([0., 1., 0.])) == clusters.predict(array([0, 1.1, 0.]))
True
>>> clusters.predict(array([0., 0., 1.])) == clusters.predict(array([0, 0, 1.1]))
True
>>> clusters.predict(sparse_data[0]) == clusters.predict(sparse_data[1])
True
>>> clusters.predict(sparse_data[2]) == clusters.predict(sparse_data[3])
True
"""
def __init__(self, centers_):
self.centers = centers_
Expand All @@ -43,8 +59,7 @@ def predict(self, x):
best = 0
best_distance = 1e75
for i in range(0, self.centers.shape[0]):
diff = x - self.centers[i]
distance = sqrt(dot(diff, diff))
distance = _squared_distance(x, self.centers[i])
if distance < best_distance:
best = i
best_distance = distance
Expand Down
68 changes: 63 additions & 5 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, size, *args):

def dot(self, other):
"""
Dot product with another SparseVector or NumPy array.
Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.dot(a)
Expand All @@ -69,12 +69,22 @@ def dot(self, other):
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.dot(b)
0.0
>>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
array([ 22., 22.])
"""
result = 0.0
if type(other) == ndarray:
for i in xrange(len(self.indices)):
result += self.values[i] * other[self.indices[i]]
if other.ndim == 1:
result = 0.0
for i in xrange(len(self.indices)):
result += self.values[i] * other[self.indices[i]]
return result
elif other.ndim == 2:
results = [self.dot(other[:,i]) for i in xrange(other.shape[1])]
return array(results)
else:
raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
else:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
if self.indices[i] == other.indices[j]:
Expand All @@ -85,7 +95,55 @@ def dot(self, other):
i += 1
else:
j += 1
return result
return result

def squared_distance(self, other):
"""
Squared distance from a SparseVector or 1-dimensional NumPy array.
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.squared_distance(a)
0.0
>>> a.squared_distance(array([1., 2., 3., 4.]))
1.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.squared_distance(b)
30.0
>>> b.squared_distance(a)
30.0
"""
if type(other) == ndarray:
if other.ndim == 1:
result = 0.0
for i in xrange(len(self.indices)):
diff = self.values[i] - other[self.indices[i]]
result += diff * diff
return result
else:
raise Exception("Cannot call squared_distance with %d-dimensional array" %
other.ndim)
else:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
if self.indices[i] == other.indices[j]:
diff = self.values[i] - other.values[j]
result += diff * diff
i += 1
j += 1
elif self.indices[i] < other.indices[j]:
result += self.values[i] * self.values[i]
i += 1
else:
result += other.values[j] * other.values[j]
j += 1
while i < len(self.indices):
result += self.values[i] * self.values[i]
i += 1
while j < len(other.indices):
result += other.values[j] * other.values[j]
j += 1
return result

def __str__(self):
inds = self.indices
Expand Down
56 changes: 55 additions & 1 deletion python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
_linear_predictor_typecheck
from pyspark.mllib.linalg import SparseVector

class LinearModel(object):
"""Something that has a vector of coefficients and an intercept."""
Expand All @@ -36,18 +37,37 @@ class LinearRegressionModelBase(LinearModel):
>>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
>>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
True
>>> abs(lrmb.predict(SparseVector(2, [0, 1], [-1.03, 7.777])) - 14.624) < 1e-6
True
"""
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
return dot(self._coeff, x) + self._intercept
return x.dot(self._coeff) + self._intercept

class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5
True
>>> data = [
... SparseVector(2, [0, 1], [0.0, 0.0]),
... SparseVector(2, [0, 1], [1.0, 1.0]),
... SparseVector(2, [0, 1], [3.0, 2.0]),
... SparseVector(2, [0, 1], [2.0, 3.0])
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5
True
"""

class LinearRegressionWithSGD(object):
Expand All @@ -67,6 +87,23 @@ class LassoModel(LinearRegressionModelBase):
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5
True
>>> data = [
... SparseVector(2, [0, 1], [0.0, 0.0]),
... SparseVector(2, [0, 1], [1.0, 1.0]),
... SparseVector(2, [0, 1], [3.0, 2.0]),
... SparseVector(2, [0, 1], [2.0, 3.0])
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5
True
"""

class LassoWithSGD(object):
Expand All @@ -86,6 +123,23 @@ class RidgeRegressionModel(LinearRegressionModelBase):
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5
True
>>> data = [
... SparseVector(2, [0, 1], [0.0, 0.0]),
... SparseVector(2, [0, 1], [1.0, 1.0]),
... SparseVector(2, [0, 1], [3.0, 2.0]),
... SparseVector(2, [0, 1], [2.0, 3.0])
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5
True
"""

class RidgeRegressionWithSGD(object):
Expand Down

0 comments on commit eaee759

Please sign in to comment.