diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index b884d62044c38..e8d11870fdc02 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ import org.apache.spark.rdd.RDD @@ -36,43 +36,92 @@ import org.apache.spark.rdd.RDD */ @DeveloperApi class PythonMLLibAPI extends Serializable { - private val DENSE_VECTOR_MAGIC = 1 - private val SPARSE_VECTOR_MAGIC = 2 - private val DENSE_MATRIX_MAGIC = 3 - - private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = { - val packetLength = bytes.length - if (packetLength < 5) { - throw new IllegalArgumentException("Byte array too short.") + private val DENSE_VECTOR_MAGIC: Byte = 1 + private val SPARSE_VECTOR_MAGIC: Byte = 2 + private val DENSE_MATRIX_MAGIC: Byte = 3 + + private def deserializeDoubleVector(bytes: Array[Byte]): Vector = { + require(bytes.length >= 5, "Byte array too short") + val magic = bytes(0) + if (magic == DENSE_VECTOR_MAGIC) { + deserializeDenseVector(bytes) + } else if (magic == SPARSE_VECTOR_MAGIC) { + deserializeSparseVector(bytes) + } else { + throw new IllegalArgumentException("Magic " + magic + " is wrong.") } + } + + private def deserializeDenseVector(bytes: Array[Byte]): Vector = { + val packetLength = bytes.length + require(packetLength >= 5, "Byte array too short") val bb = ByteBuffer.wrap(bytes) bb.order(ByteOrder.nativeOrder()) val magic = bb.get() - if (magic != DENSE_VECTOR_MAGIC) { - throw new IllegalArgumentException("Magic " + magic + " is wrong.") - } + require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic) val length = bb.getInt() - if (packetLength != 5 + 8 * length) { - throw new IllegalArgumentException("Length " + length + " is wrong.") - } + require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength) val db = bb.asDoubleBuffer() val ans = new Array[Double](length.toInt) db.get(ans) - ans + Vectors.dense(ans) + } + + private def deserializeSparseVector(bytes: Array[Byte]): Vector = { + val packetLength = bytes.length + require(packetLength >= 9, "Byte array too short") + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + val magic = bb.get() + require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic) + val size = bb.getInt() + val nonZeros = bb.getInt() + require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength) + val ib = bb.asIntBuffer() + val indices = new Array[Int](nonZeros) + ib.get(indices) + bb.position(bb.position() + 4 * nonZeros) + val db = bb.asDoubleBuffer() + val values = new Array[Double](nonZeros) + db.get(values) + Vectors.sparse(size, indices, values) } - private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = { + private def serializeDenseVector(doubles: Array[Double]): Array[Byte] = { val len = doubles.length val bytes = new Array[Byte](5 + 8 * len) val bb = ByteBuffer.wrap(bytes) bb.order(ByteOrder.nativeOrder()) - bb.put(1: Byte) + bb.put(DENSE_VECTOR_MAGIC) bb.putInt(len) val db = bb.asDoubleBuffer() db.put(doubles) bytes } + private def serializeSparseVector(vector: SparseVector): Array[Byte] = { + val nonZeros = vector.indices.length + val bytes = new Array[Byte](9 + 12 * nonZeros) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.put(SPARSE_VECTOR_MAGIC) + bb.putInt(vector.size) + bb.putInt(nonZeros) + val ib = bb.asIntBuffer() + ib.put(vector.indices) + bb.position(bb.position() + 4 * nonZeros) + val db = bb.asDoubleBuffer() + db.put(vector.values) + bytes + } + + private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match { + case s: SparseVector => + serializeSparseVector(s) + case _ => + serializeDenseVector(vector.toArray) + } + private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = { val packetLength = bytes.length if (packetLength < 9) { @@ -107,7 +156,7 @@ class PythonMLLibAPI extends Serializable { val bytes = new Array[Byte](9 + 8 * rows * cols) val bb = ByteBuffer.wrap(bytes) bb.order(ByteOrder.nativeOrder()) - bb.put(2: Byte) + bb.put(DENSE_MATRIX_MAGIC) bb.putInt(rows) bb.putInt(cols) val db = bb.asDoubleBuffer() @@ -118,17 +167,17 @@ class PythonMLLibAPI extends Serializable { } private def trainRegressionModel( - trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel, + trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel, dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = { val data = dataBytesJRDD.rdd.map(xBytes => { - val x = deserializeDoubleVector(xBytes) + val x = deserializeDoubleVector(xBytes).toArray // TODO: deal with sparse vectors here! LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length))) }) val initialWeights = deserializeDoubleVector(initialWeightsBA) val model = trainFunc(data, initialWeights) val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(serializeDoubleVector(model.weights.toArray)) + ret.add(serializeDoubleVector(model.weights)) ret.add(model.intercept: java.lang.Double) ret } @@ -149,7 +198,7 @@ class PythonMLLibAPI extends Serializable { numIterations, stepSize, miniBatchFraction, - Vectors.dense(initialWeights)), + initialWeights), dataBytesJRDD, initialWeightsBA) } @@ -172,7 +221,7 @@ class PythonMLLibAPI extends Serializable { stepSize, regParam, miniBatchFraction, - Vectors.dense(initialWeights)), + initialWeights), dataBytesJRDD, initialWeightsBA) } @@ -195,7 +244,7 @@ class PythonMLLibAPI extends Serializable { stepSize, regParam, miniBatchFraction, - Vectors.dense(initialWeights)), + initialWeights), dataBytesJRDD, initialWeightsBA) } @@ -218,7 +267,7 @@ class PythonMLLibAPI extends Serializable { stepSize, regParam, miniBatchFraction, - Vectors.dense(initialWeights)), + initialWeights), dataBytesJRDD, initialWeightsBA) } @@ -239,7 +288,7 @@ class PythonMLLibAPI extends Serializable { numIterations, stepSize, miniBatchFraction, - Vectors.dense(initialWeights)), + initialWeights), dataBytesJRDD, initialWeightsBA) } @@ -251,13 +300,13 @@ class PythonMLLibAPI extends Serializable { dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double): java.util.List[java.lang.Object] = { val data = dataBytesJRDD.rdd.map(xBytes => { - val x = deserializeDoubleVector(xBytes) + val x = deserializeDoubleVector(xBytes).toArray // TODO: make this efficient for sparse vecs LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length))) }) val model = NaiveBayes.train(data, lambda) val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(serializeDoubleVector(model.labels)) - ret.add(serializeDoubleVector(model.pi)) + ret.add(serializeDoubleVector(Vectors.dense(model.labels))) + ret.add(serializeDoubleVector(Vectors.dense(model.pi))) ret.add(serializeDoubleMatrix(model.theta)) ret } @@ -271,7 +320,7 @@ class PythonMLLibAPI extends Serializable { maxIterations: Int, runs: Int, initializationMode: String): java.util.List[java.lang.Object] = { - val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes))) + val data = dataBytesJRDD.rdd.map(deserializeDoubleVector) val model = KMeans.train(data, k, maxIterations, runs, initializationMode) val ret = new java.util.LinkedList[java.lang.Object]() ret.add(serializeDoubleMatrix(model.clusterCenters.map(_.toArray))) diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index b22e9c5f43eac..52d5529c2d6d9 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -41,9 +41,9 @@ SPARSE_VECTOR_MAGIC = 2 DENSE_MATRIX_MAGIC = 3 -def _deserialize_numpy_array(shape, ba, offset): +def _deserialize_numpy_array(shape, ba, offset, dtype='float64'): """ - Deserialize a numpy array of float64s from a given offset in + Deserialize a numpy array of the given type from an offset in bytearray ba, assigning it the given shape. >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0]) @@ -52,8 +52,11 @@ def _deserialize_numpy_array(shape, ba, offset): >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2) >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0)) True + >>> x = array([1, 2, 3], dtype='int32') + >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, dtype='int32')) + True """ - ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64", order='C') + ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C') return ar.copy() def _serialize_double_vector(v): @@ -64,9 +67,16 @@ def _serialize_double_vector(v): >>> array_equal(y, array([1.0, 2.0, 3.0])) True """ - if type(v) != ndarray: + if type(v) == ndarray: + return _serialize_dense_vector(v) + elif type(v) == SparseVector: + return _serialize_sparse_vector(v) + else: raise TypeError("_serialize_double_vector called on a %s; " - "wanted ndarray" % type(v)) + "wanted ndarray or SparseVector" % type(v)) + +def _serialize_dense_vector(v): + """Serialize a dense vector given as a NumPy array.""" if v.ndim != 1: raise TypeError("_serialize_double_vector called on a %ddarray; " "wanted a 1darray" % v.ndim) @@ -84,12 +94,28 @@ def _serialize_double_vector(v): arr_mid[...] = v return ba +def _serialize_sparse_vector(v): + """Serialize a pyspark.mllib.linalg.SparseVector.""" + nonzeros = len(v.indices) + ba = bytearray(9 + 12 * nonzeros) + ba[0] = SPARSE_VECTOR_MAGIC + header = ndarray(shape=[2], buffer=ba, offset=1, dtype="int32") + header[0] = v.size + header[1] = nonzeros + copyto(ndarray(shape=[nonzeros], buffer=ba, offset=9, dtype="int32"), v.indices) + values_offset = 9 + 4 * nonzeros + copyto(ndarray(shape=[nonzeros], buffer=ba, offset=values_offset, dtype="float64"), v.values) + return ba + def _deserialize_double_vector(ba): """Deserialize a double vector from a mutually understood format. >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]) >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x))) True + >>> s = SparseVector(4, [1, 3], [3.0, 5.5]) + >>> s == _deserialize_double_vector(_serialize_double_vector(s)) + True """ if type(ba) != bytearray: raise TypeError("_deserialize_double_vector called on a %s; " @@ -97,15 +123,40 @@ def _deserialize_double_vector(ba): if len(ba) < 5: raise TypeError("_deserialize_double_vector called on a %d-byte array, " "which is too short" % len(ba)) - if ba[0] != DENSE_VECTOR_MAGIC: + if ba[0] == DENSE_VECTOR_MAGIC: + return _deserialize_dense_vector(ba) + elif ba[0] == SPARSE_VECTOR_MAGIC: + return _deserialize_sparse_vector(ba) + else: raise TypeError("_deserialize_double_vector called on bytearray " "with wrong magic") + +def _deserialize_dense_vector(ba): + """Deserialize a dense vector into a numpy array.""" + if len(ba) < 5: + raise TypeError("_deserialize_dense_vector called on a %d-byte array, " + "which is too short" % len(ba)) length = ndarray(shape=[1], buffer=ba, offset=1, dtype="int32")[0] - if len(ba) != 8*length + 5: - raise TypeError("_deserialize_double_vector called on bytearray " + if len(ba) != 8 * length + 5: + raise TypeError("_deserialize_dense_vector called on bytearray " "with wrong length") return _deserialize_numpy_array([length], ba, 5) +def _deserialize_sparse_vector(ba): + """Deserialize a sparse vector into a MLlib SparseVector object.""" + if len(ba) < 9: + raise TypeError("_deserialize_sparse_vector called on a %d-byte array, " + "which is too short" % len(ba)) + header = ndarray(shape=[2], buffer=ba, offset=1, dtype="int32") + size = header[0] + nonzeros = header[1] + if len(ba) != 9 + 12 * nonzeros: + raise TypeError("_deserialize_sparse_vector called on bytearray " + "with wrong length") + indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype='int32') + values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype='float64') + return SparseVector(int(size), indices, values) + def _serialize_double_matrix(m): """Serialize a double matrix into a mutually understood format.""" if (type(m) == ndarray and m.ndim == 2): @@ -152,13 +203,15 @@ def _linear_predictor_typecheck(x, coeffs): This is a temporary hackaround until I actually implement bulk predict.""" if type(x) == ndarray: if x.ndim == 1: - if x.shape == coeffs.shape: - pass - else: + if x.shape != coeffs.shape: raise RuntimeError("Got array of %d elements; wanted %d" - % (numpy.shape(x)[0], numpy.shape(coeffs)[0])) + % (numpy.shape(x)[0], coeffs.shape[0])) else: raise RuntimeError("Bulk predict not yet supported.") + elif type(x) == SparseVector: + if x.size != coeffs.shape[0]: + raise RuntimeError("Got sparse vector of size %d; wanted %d" + % (x.size, coeffs.shape[0])) elif (type(x) == RDD): raise RuntimeError("Bulk predict not yet supported.") else: @@ -192,20 +245,23 @@ def predict(self, x): """Predict the value of the dependent variable given a vector x""" """containing values for the independent variables.""" _linear_predictor_typecheck(x, self._coeff) - return numpy.dot(self._coeff, x) + self._intercept + return x.dot(self._coeff) + self._intercept # If we weren't given initial weights, take a zero vector of the appropriate # length. def _get_initial_weights(initial_weights, data): if initial_weights is None: initial_weights = data.first() - if type(initial_weights) != ndarray: + if type(initial_weights) == ndarray: + if initial_weights.ndim != 1: + raise TypeError("At least one data element has " + + initial_weights.ndim + " dimensions, which is not 1") + initial_weights = numpy.ones([initial_weights.shape[0] - 1]) + elif type(initial_weights) == SparseVector: + initial_weights = numpy.ones(initial_weights.size - 1) + else: raise TypeError("At least one data element has type " - + type(initial_weights).__name__ + " which is not ndarray") - if initial_weights.ndim != 1: - raise TypeError("At least one data element has " - + initial_weights.ndim + " dimensions, which is not 1") - initial_weights = numpy.ones([initial_weights.shape[0] - 1]) + + type(initial_weights).__name__ + " which is not a vector") return initial_weights # train_func should take two parameters, namely data and initial_weights, and diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index d2f9cdb3f4298..a8f1e07ea0028 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -17,7 +17,7 @@ import numpy -from numpy import array, dot, shape +from numpy import array, shape from pyspark import SparkContext from pyspark.mllib._common import \ _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ @@ -25,6 +25,7 @@ _serialize_double_vector, _deserialize_double_vector, \ _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ LinearModel, _linear_predictor_typecheck +from pyspark.mllib.linalg import SparseVector from math import exp, log class LogisticRegressionModel(LinearModel): @@ -34,10 +35,25 @@ class LogisticRegressionModel(LinearModel): >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) >>> lrm.predict(array([1.0])) > 0 True + >>> lrm.predict(array([0.0])) <= 0 + True + >>> sparse_data = [ + ... SparseVector(3, [0, 1], [0.0, 0.0]), + ... SparseVector(3, [0, 2], [1.0, 1.0]), + ... SparseVector(3, [0, 1], [0.0, 0.0]), + ... SparseVector(3, [0, 2], [1.0, 2.0]) + ... ] + >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data)) + >>> lrm.predict(array([0.0, 1.0])) > 0 + True + >>> lrm.predict(array([0.0, 0.0])) <= 0 + True + >>> lrm.predict(SparseVector(2, [1], [1.0])) > 0 + True """ def predict(self, x): _linear_predictor_typecheck(x, self._coeff) - margin = dot(x, self._coeff) + self._intercept + margin = x.dot(self._coeff) + self._intercept prob = 1/(1 + exp(-margin)) return 1 if prob > 0.5 else 0 @@ -62,7 +78,7 @@ class SVMModel(LinearModel): """ def predict(self, x): _linear_predictor_typecheck(x, self._coeff) - margin = dot(x, self._coeff) + self._intercept + margin = x.dot(self._coeff) + self._intercept return 1 if margin >= 0 else 0 class SVMWithSGD(object): @@ -99,7 +115,7 @@ def __init__(self, labels, pi, theta): def predict(self, x): """Return the most likely class for a data vector x""" - return self.labels[numpy.argmax(self.pi + dot(x, self.theta))] + return self.labels[numpy.argmax(self.pi + x.dot(self.theta))] class NaiveBayes(object): @classmethod diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index ce62f87448afb..43c7c799bc860 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -23,7 +23,7 @@ SciPy is available in their environment. """ -from numpy import array +from numpy import array, array_equal, ndarray class SparseVector(object): @@ -35,23 +35,57 @@ class SparseVector(object): def __init__(self, size, *args): """ Create a sparse vector, using either an array of (index, value) pairs - or two separate arrays of indices and values. + or two separate arrays of indices and values (sorted by index). >>> print SparseVector(4, [(1, 1.0), (3, 5.5)]) [1: 1.0, 3: 5.5] >>> print SparseVector(4, [1, 3], [1.0, 5.5]) [1: 1.0, 3: 5.5] """ + assert type(size) == int, "first argument must be an int" self.size = size assert 1 <= len(args) <= 2, "must pass either 1 or 2 arguments" if len(args) == 1: - pairs = args[0] + pairs = sorted(args[0]) self.indices = array([p[0] for p in pairs], dtype='int32') self.values = array([p[1] for p in pairs], dtype='float64') else: assert len(args[0]) == len(args[1]), "index and value arrays not same length" self.indices = array(args[0], dtype='int32') self.values = array(args[1], dtype='float64') + for i in xrange(len(self.indices) - 1): + if self.indices[i] >= self.indices[i + 1]: + raise TypeError("indices array must be sorted") + + def dot(self, other): + """ + Dot product with another SparseVector or NumPy array. + + >>> a = SparseVector(4, [1, 3], [3.0, 4.0]) + >>> a.dot(a) + 25.0 + >>> a.dot(array([1., 2., 3., 4.])) + 22.0 + >>> b = SparseVector(4, [2, 4], [1.0, 2.0]) + >>> a.dot(b) + 0.0 + """ + result = 0.0 + if type(other) == ndarray: + for i in xrange(len(self.indices)): + result += self.values[i] * other[self.indices[i]] + else: + i, j = 0, 0 + while i < len(self.indices) and j < len(other.indices): + if self.indices[i] == other.indices[j]: + result += self.values[i] * other.values[j] + i += 1 + j += 1 + elif self.indices[i] < other.indices[j]: + i += 1 + else: + j += 1 + return result def __str__(self): inds = self.indices @@ -65,6 +99,26 @@ def __repr__(self): entries = ", ".join(["({0}, {1})".format(inds[i], vals[i]) for i in xrange(len(inds))]) return "SparseVector({0}, [{1}])".format(self.size, entries) + def __eq__(self, other): + """ + Test SparseVectors for equality. + + >>> v1 = SparseVector(4, [(1, 1.0), (3, 5.5)]) + >>> v2 = SparseVector(4, [(1, 1.0), (3, 5.5)]) + >>> v1 == v2 + True + >>> v1 != v2 + False + """ + + return (isinstance(other, self.__class__) + and other.size == self.size + and array_equal(other.indices, self.indices) + and array_equal(other.values, self.values)) + + def __ne__(self, other): + return not self.__eq__(other) + class Vectors(object): @@ -80,7 +134,7 @@ class Vectors(object): def sparse(size, *args): """ Create a sparse vector, using either an array of (index, value) pairs - or two separate arrays of indices and values. + or two separate arrays of indices and values (sorted by index). >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)]) [1: 1.0, 3: 5.5]