Skip to content

Commit

Permalink
Support scipy.sparse matrices in all our algorithms and models
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Apr 15, 2014
1 parent ab244d1 commit 889dde8
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 21 deletions.
3 changes: 2 additions & 1 deletion python/epydoc.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ target: docs/
private: no

exclude: pyspark.cloudpickle pyspark.worker pyspark.join
pyspark.java_gateway pyspark.examples pyspark.shell pyspark.test
pyspark.java_gateway pyspark.examples pyspark.shell pyspark.tests
pyspark.rddsampler pyspark.daemon pyspark.mllib._common
pyspark.mllib.tests
69 changes: 64 additions & 5 deletions python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,29 @@
from pyspark.mllib.linalg import SparseVector
from pyspark.serializers import Serializer

"""
Common utilities shared throughout MLlib, primarily for dealing with
different data types. These include:
- Serialization utilities to / from byte arrays that Java can handle
- Serializers for other data types, like ALS Rating objects
- Common methods for linear models
- Methods to deal with the different vector types we support, such as
SparseVector and scipy.sparse matrices.
"""

# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.

_have_scipy = False
_scipy_issparse = None
try:
import scipy.sparse
_have_scipy = True
_scipy_issparse = scipy.sparse.issparse
except:
# No SciPy in environment, but that's okay
pass

# Dense double vector format:
#
# [1-byte 1] [4-byte length] [length*8 bytes of data]
Expand Down Expand Up @@ -67,6 +90,7 @@ def _serialize_double_vector(v):
>>> array_equal(y, array([1.0, 2.0, 3.0]))
True
"""
v = _convert_vector(v)
if type(v) == ndarray:
return _serialize_dense_vector(v)
elif type(v) == SparseVector:
Expand Down Expand Up @@ -201,6 +225,7 @@ def _deserialize_double_matrix(ba):
def _linear_predictor_typecheck(x, coeffs):
"""Check that x is a one-dimensional vector of the right shape.
This is a temporary hackaround until I actually implement bulk predict."""
x = _convert_vector(x)
if type(x) == ndarray:
if x.ndim == 1:
if x.shape != coeffs.shape:
Expand Down Expand Up @@ -245,23 +270,20 @@ def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
return x.dot(self._coeff) + self._intercept
return _dot(x, self._coeff) + self._intercept

# If we weren't given initial weights, take a zero vector of the appropriate
# length.
def _get_initial_weights(initial_weights, data):
if initial_weights is None:
initial_weights = data.first()
initial_weights = _convert_vector(data.first())
if type(initial_weights) == ndarray:
if initial_weights.ndim != 1:
raise TypeError("At least one data element has "
+ initial_weights.ndim + " dimensions, which is not 1")
initial_weights = numpy.ones([initial_weights.shape[0] - 1])
elif type(initial_weights) == SparseVector:
initial_weights = numpy.ones([initial_weights.size - 1])
else:
raise TypeError("At least one data element has type "
+ type(initial_weights).__name__ + " which is not a vector")
return initial_weights

# train_func should take two parameters, namely data and initial_weights, and
Expand Down Expand Up @@ -327,6 +349,8 @@ def _squared_distance(v1, v2):
>>> _squared_distance(sparse1, sparse2)
2.0
"""
v1 = _convert_vector(v1)
v2 = _convert_vector(v2)
if type(v1) == ndarray and type(v2) == ndarray:
diff = v1 - v2
return diff.dot(diff)
Expand All @@ -335,6 +359,41 @@ def _squared_distance(v1, v2):
else:
return v1.squared_distance(v2)

def _convert_vector(vec):
"""
Convert a vector to a format we support internally. This does
the following:
* For dense NumPy vectors (ndarray), returns them as is
* For our SparseVector class, returns that as is
* For scipy.sparse.*_matrix column vectors, converts them to
our own SparseVector type.
This should be called before passing any data to our algorithms
or attempting to serialize it to Java.
"""
if type(vec) == ndarray or type(vec) == SparseVector:
return vec
elif _have_scipy:
if _scipy_issparse(vec):
assert vec.shape[1] == 1, "Expected column vector"
csc = vec.tocsc()
return SparseVector(vec.shape[0], csc.indices, csc.data)
raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix")

def _dot(vec, target):
"""
Compute the dot product of a vector of the types we support
(Numpy dense, SparseVector, or SciPy sparse) and a target NumPy
array that is either 1- or 2-dimensional. Equivalent to calling
numpy.dot of the two vectors, but for SciPy ones, we have to
transpose them because they're column vectors.
"""
if type(vec) == ndarray or type(vec) == SparseVector:
return vec.dot(target)
else:
return vec.transpose().dot(target)[0]

def _test():
import doctest
globs = globals().copy()
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from numpy import array, shape
from pyspark import SparkContext
from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
Expand Down Expand Up @@ -55,7 +55,7 @@ class LogisticRegressionModel(LinearModel):
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
margin = x.dot(self._coeff) + self._intercept
margin = _dot(x, self._coeff) + self._intercept
prob = 1/(1 + exp(-margin))
return 1 if prob > 0.5 else 0

Expand Down Expand Up @@ -91,7 +91,7 @@ class SVMModel(LinearModel):
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
margin = x.dot(self._coeff) + self._intercept
margin = _dot(x, self._coeff) + self._intercept
return 1 if margin >= 0 else 0

class SVMWithSGD(object):
Expand Down Expand Up @@ -138,7 +138,7 @@ def __init__(self, labels, pi, theta):

def predict(self, x):
"""Return the most likely class for a data vector x"""
return self.labels[numpy.argmax(self.pi + x.dot(self.theta))]
return self.labels[numpy.argmax(self.pi + _dot(x, self.theta))]

class NaiveBayes(object):
@classmethod
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, centers_):
def predict(self, x):
"""Find the cluster to which x belongs in this model."""
best = 0
best_distance = 1e75
best_distance = float("inf")
for i in range(0, self.centers.shape[0]):
distance = _squared_distance(x, self.centers[i])
if distance < best_distance:
Expand Down
21 changes: 13 additions & 8 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

"""
MLlib utilities for working with vectors. For dense vectors, MLlib
MLlib utilities for linear algebra. For dense vectors, MLlib
uses the NumPy C{array} type, so you can simply pass NumPy arrays
around. For sparse vectors, users can construct a L{SparseVector}
object from MLlib or pass SciPy C{scipy.sparse} column vectors if
Expand All @@ -29,7 +29,7 @@
class SparseVector(object):
"""
A simple sparse vector class for passing data to MLlib. Users may
alternatively pass use SciPy's {scipy.sparse} data types.
alternatively pass SciPy's {scipy.sparse} data types.
"""

def __init__(self, size, *args):
Expand All @@ -40,7 +40,7 @@ def __init__(self, size, *args):
@param size: Size of the vector.
@param args: Non-zero entries, as a dictionary, list of tupes,
or two sorted lists containing indices and values.
or two sorted lists containing indices and values.
>>> print SparseVector(4, {1: 1.0, 3: 5.5})
[1: 1.0, 3: 5.5]
Expand Down Expand Up @@ -115,7 +115,7 @@ def squared_distance(self, other):
>>> a.squared_distance(a)
0.0
>>> a.squared_distance(array([1., 2., 3., 4.]))
1.0
11.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.squared_distance(b)
30.0
Expand All @@ -125,9 +125,14 @@ def squared_distance(self, other):
if type(other) == ndarray:
if other.ndim == 1:
result = 0.0
for i in xrange(len(self.indices)):
diff = self.values[i] - other[self.indices[i]]
result += diff * diff
j = 0 # index into our own array
for i in xrange(other.shape[0]):
if j < len(self.indices) and self.indices[j] == i:
diff = self.values[j] - other[i]
result += diff * diff
j += 1
else:
result += other[i] * other[i]
return result
else:
raise Exception("Cannot call squared_distance with %d-dimensional array" %
Expand Down Expand Up @@ -191,7 +196,7 @@ def __ne__(self, other):

class Vectors(object):
"""
Factory methods to create MLlib vectors. Note that dense vectors
Factory methods for working with vectors. Note that dense vectors
are simply represented as NumPy array objects, so there is no need
to covert them for use in MLlib. For sparse vectors, the factory
methods in this class create an MLlib-compatible type, or users
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from numpy import array, dot
from pyspark import SparkContext
from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
Expand All @@ -44,7 +44,7 @@ def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
return x.dot(self._coeff) + self._intercept
return _dot(x, self._coeff) + self._intercept

class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
Expand Down
Loading

0 comments on commit 889dde8

Please sign in to comment.