diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 52d5529c2d6d9..dc8764b6ccce6 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -258,7 +258,7 @@ def _get_initial_weights(initial_weights, data): + initial_weights.ndim + " dimensions, which is not 1") initial_weights = numpy.ones([initial_weights.shape[0] - 1]) elif type(initial_weights) == SparseVector: - initial_weights = numpy.ones(initial_weights.size - 1) + initial_weights = numpy.ones([initial_weights.size - 1]) else: raise TypeError("At least one data element has type " + type(initial_weights).__name__ + " which is not a vector") @@ -310,6 +310,31 @@ def _serialize_tuple(t): intpart[0], intpart[1] = t return ba +def _squared_distance(v1, v2): + """ + Squared distance of two NumPy or sparse vectors. + + >>> dense1 = array([1., 2.]) + >>> sparse1 = SparseVector(2, [0, 1], [1., 2.]) + >>> dense2 = array([2., 1.]) + >>> sparse2 = SparseVector(2, [0, 1], [2., 1.]) + >>> _squared_distance(dense1, dense2) + 2.0 + >>> _squared_distance(dense1, sparse2) + 2.0 + >>> _squared_distance(sparse1, dense2) + 2.0 + >>> _squared_distance(sparse1, sparse2) + 2.0 + """ + if type(v1) == ndarray and type(v2) == ndarray: + diff = v1 - v2 + return diff.dot(diff) + elif type(v1) == ndarray: + return v2.squared_distance(v1) + else: + return v1.squared_distance(v2) + def _test(): import doctest globs = globals().copy() diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index a8f1e07ea0028..42504299f2375 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -50,6 +50,8 @@ class LogisticRegressionModel(LinearModel): True >>> lrm.predict(SparseVector(2, [1], [1.0])) > 0 True + >>> lrm.predict(SparseVector(2, [1], [0.0])) <= 0 + True """ def predict(self, x): _linear_predictor_typecheck(x, self._coeff) @@ -75,6 +77,17 @@ class SVMModel(LinearModel): >>> svm = SVMWithSGD.train(sc.parallelize(data)) >>> svm.predict(array([1.0])) > 0 True + >>> sparse_data = [ + ... SparseVector(3, [0, 1], [0.0, 0.0]), + ... SparseVector(3, [0, 2], [1.0, 1.0]), + ... SparseVector(3, [0, 1], [0.0, 0.0]), + ... SparseVector(3, [0, 2], [1.0, 2.0]) + ... ] + >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data)) + >>> svm.predict(SparseVector(2, [1], [1.0])) > 0 + True + >>> svm.predict(SparseVector(2, [1], [0.0])) <= 0 + True """ def predict(self, x): _linear_predictor_typecheck(x, self._coeff) @@ -106,6 +119,16 @@ class NaiveBayesModel(object): 0.0 >>> model.predict(array([1.0, 0.0])) 1.0 + >>> sparse_data = [ + ... SparseVector(3, [0, 2], [0.0, 1.0]), + ... SparseVector(3, [0, 2], [0.0, 2.0]), + ... SparseVector(3, [0, 1], [1.0, 1.0]) + ... ] + >>> model = NaiveBayes.train(sc.parallelize(sparse_data)) + >>> model.predict(SparseVector(2, [1], [1.0])) + 0.0 + >>> model.predict(SparseVector(2, [0], [1.0])) + 1.0 """ def __init__(self, labels, pi, theta): diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 30862918c3f86..bd1f9394ec854 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -19,10 +19,11 @@ from math import sqrt from pyspark import SparkContext from pyspark.mllib._common import \ - _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ + _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \ _serialize_double_matrix, _deserialize_double_matrix, \ _serialize_double_vector, _deserialize_double_vector, \ _get_initial_weights, _serialize_rating, _regression_train_wrapper +from pyspark.mllib.linalg import SparseVector class KMeansModel(object): """A clustering model derived from the k-means method. @@ -34,6 +35,21 @@ class KMeansModel(object): >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0])) True >>> clusters = KMeans.train(sc.parallelize(data), 2) + >>> sparse_data = [ + ... SparseVector(3, [0, 1], [0.0, 1.0]), + ... SparseVector(3, [0, 1], [0.0, 1.1]), + ... SparseVector(3, [0, 2], [0.0, 1.0]), + ... SparseVector(3, [0, 2], [0.0, 1.1]) + ... ] + >>> clusters = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||") + >>> clusters.predict(array([0., 1., 0.])) == clusters.predict(array([0, 1.1, 0.])) + True + >>> clusters.predict(array([0., 0., 1.])) == clusters.predict(array([0, 0, 1.1])) + True + >>> clusters.predict(sparse_data[0]) == clusters.predict(sparse_data[1]) + True + >>> clusters.predict(sparse_data[2]) == clusters.predict(sparse_data[3]) + True """ def __init__(self, centers_): self.centers = centers_ @@ -43,8 +59,7 @@ def predict(self, x): best = 0 best_distance = 1e75 for i in range(0, self.centers.shape[0]): - diff = x - self.centers[i] - distance = sqrt(dot(diff, diff)) + distance = _squared_distance(x, self.centers[i]) if distance < best_distance: best = i best_distance = distance diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 43c7c799bc860..8eebbcad6b77b 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -59,7 +59,7 @@ def __init__(self, size, *args): def dot(self, other): """ - Dot product with another SparseVector or NumPy array. + Dot product with a SparseVector or 1- or 2-dimensional Numpy array. >>> a = SparseVector(4, [1, 3], [3.0, 4.0]) >>> a.dot(a) @@ -69,12 +69,22 @@ def dot(self, other): >>> b = SparseVector(4, [2, 4], [1.0, 2.0]) >>> a.dot(b) 0.0 + >>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]])) + array([ 22., 22.]) """ - result = 0.0 if type(other) == ndarray: - for i in xrange(len(self.indices)): - result += self.values[i] * other[self.indices[i]] + if other.ndim == 1: + result = 0.0 + for i in xrange(len(self.indices)): + result += self.values[i] * other[self.indices[i]] + return result + elif other.ndim == 2: + results = [self.dot(other[:,i]) for i in xrange(other.shape[1])] + return array(results) + else: + raise Exception("Cannot call dot with %d-dimensional array" % other.ndim) else: + result = 0.0 i, j = 0, 0 while i < len(self.indices) and j < len(other.indices): if self.indices[i] == other.indices[j]: @@ -85,7 +95,55 @@ def dot(self, other): i += 1 else: j += 1 - return result + return result + + def squared_distance(self, other): + """ + Squared distance from a SparseVector or 1-dimensional NumPy array. + + >>> a = SparseVector(4, [1, 3], [3.0, 4.0]) + >>> a.squared_distance(a) + 0.0 + >>> a.squared_distance(array([1., 2., 3., 4.])) + 1.0 + >>> b = SparseVector(4, [2, 4], [1.0, 2.0]) + >>> a.squared_distance(b) + 30.0 + >>> b.squared_distance(a) + 30.0 + """ + if type(other) == ndarray: + if other.ndim == 1: + result = 0.0 + for i in xrange(len(self.indices)): + diff = self.values[i] - other[self.indices[i]] + result += diff * diff + return result + else: + raise Exception("Cannot call squared_distance with %d-dimensional array" % + other.ndim) + else: + result = 0.0 + i, j = 0, 0 + while i < len(self.indices) and j < len(other.indices): + if self.indices[i] == other.indices[j]: + diff = self.values[i] - other.values[j] + result += diff * diff + i += 1 + j += 1 + elif self.indices[i] < other.indices[j]: + result += self.values[i] * self.values[i] + i += 1 + else: + result += other.values[j] * other.values[j] + j += 1 + while i < len(self.indices): + result += self.values[i] * self.values[i] + i += 1 + while j < len(other.indices): + result += other.values[j] * other.values[j] + j += 1 + return result def __str__(self): inds = self.indices diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 7656db07f61cc..c665eda01a430 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -23,6 +23,7 @@ _serialize_double_vector, _deserialize_double_vector, \ _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ _linear_predictor_typecheck +from pyspark.mllib.linalg import SparseVector class LinearModel(object): """Something that has a vector of coefficients and an intercept.""" @@ -36,18 +37,37 @@ class LinearRegressionModelBase(LinearModel): >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1) >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6 True + >>> abs(lrmb.predict(SparseVector(2, [0, 1], [-1.03, 7.777])) - 14.624) < 1e-6 + True """ def predict(self, x): """Predict the value of the dependent variable given a vector x""" """containing values for the independent variables.""" _linear_predictor_typecheck(x, self._coeff) - return dot(self._coeff, x) + self._intercept + return x.dot(self._coeff) + self._intercept class LinearRegressionModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5 + True + >>> data = [ + ... SparseVector(2, [0, 1], [0.0, 0.0]), + ... SparseVector(2, [0, 1], [1.0, 1.0]), + ... SparseVector(2, [0, 1], [3.0, 2.0]), + ... SparseVector(2, [0, 1], [2.0, 3.0]) + ... ] + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5 + True """ class LinearRegressionWithSGD(object): @@ -67,6 +87,23 @@ class LassoModel(LinearRegressionModelBase): >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5 + True + >>> data = [ + ... SparseVector(2, [0, 1], [0.0, 0.0]), + ... SparseVector(2, [0, 1], [1.0, 1.0]), + ... SparseVector(2, [0, 1], [3.0, 2.0]), + ... SparseVector(2, [0, 1], [2.0, 3.0]) + ... ] + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5 + True """ class LassoWithSGD(object): @@ -86,6 +123,23 @@ class RidgeRegressionModel(LinearRegressionModelBase): >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5 + True + >>> data = [ + ... SparseVector(2, [0, 1], [0.0, 0.0]), + ... SparseVector(2, [0, 1], [1.0, 1.0]), + ... SparseVector(2, [0, 1], [3.0, 2.0]), + ... SparseVector(2, [0, 1], [2.0, 3.0]) + ... ] + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, [0], [1.0])) - 1) < 0.5 + True """ class RidgeRegressionWithSGD(object):