Skip to content

Commit

Permalink
Further work to get linear models working with sparse data
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Apr 15, 2014
1 parent 154f45d commit 2abbb44
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
import org.apache.spark.rdd.RDD
Expand All @@ -36,43 +36,92 @@ import org.apache.spark.rdd.RDD
*/
@DeveloperApi
class PythonMLLibAPI extends Serializable {
private val DENSE_VECTOR_MAGIC = 1
private val SPARSE_VECTOR_MAGIC = 2
private val DENSE_MATRIX_MAGIC = 3

private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
val packetLength = bytes.length
if (packetLength < 5) {
throw new IllegalArgumentException("Byte array too short.")
private val DENSE_VECTOR_MAGIC: Byte = 1
private val SPARSE_VECTOR_MAGIC: Byte = 2
private val DENSE_MATRIX_MAGIC: Byte = 3

private def deserializeDoubleVector(bytes: Array[Byte]): Vector = {
require(bytes.length >= 5, "Byte array too short")
val magic = bytes(0)
if (magic == DENSE_VECTOR_MAGIC) {
deserializeDenseVector(bytes)
} else if (magic == SPARSE_VECTOR_MAGIC) {
deserializeSparseVector(bytes)
} else {
throw new IllegalArgumentException("Magic " + magic + " is wrong.")
}
}

private def deserializeDenseVector(bytes: Array[Byte]): Vector = {
val packetLength = bytes.length
require(packetLength >= 5, "Byte array too short")
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
val magic = bb.get()
if (magic != DENSE_VECTOR_MAGIC) {
throw new IllegalArgumentException("Magic " + magic + " is wrong.")
}
require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
val length = bb.getInt()
if (packetLength != 5 + 8 * length) {
throw new IllegalArgumentException("Length " + length + " is wrong.")
}
require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
val db = bb.asDoubleBuffer()
val ans = new Array[Double](length.toInt)
db.get(ans)
ans
Vectors.dense(ans)
}

private def deserializeSparseVector(bytes: Array[Byte]): Vector = {
val packetLength = bytes.length
require(packetLength >= 9, "Byte array too short")
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
val magic = bb.get()
require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
val size = bb.getInt()
val nonZeros = bb.getInt()
require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
val ib = bb.asIntBuffer()
val indices = new Array[Int](nonZeros)
ib.get(indices)
bb.position(bb.position() + 4 * nonZeros)
val db = bb.asDoubleBuffer()
val values = new Array[Double](nonZeros)
db.get(values)
Vectors.sparse(size, indices, values)
}

private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
private def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
val len = doubles.length
val bytes = new Array[Byte](5 + 8 * len)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.put(1: Byte)
bb.put(DENSE_VECTOR_MAGIC)
bb.putInt(len)
val db = bb.asDoubleBuffer()
db.put(doubles)
bytes
}

private def serializeSparseVector(vector: SparseVector): Array[Byte] = {
val nonZeros = vector.indices.length
val bytes = new Array[Byte](9 + 12 * nonZeros)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.put(SPARSE_VECTOR_MAGIC)
bb.putInt(vector.size)
bb.putInt(nonZeros)
val ib = bb.asIntBuffer()
ib.put(vector.indices)
bb.position(bb.position() + 4 * nonZeros)
val db = bb.asDoubleBuffer()
db.put(vector.values)
bytes
}

private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
case s: SparseVector =>
serializeSparseVector(s)
case _ =>
serializeDenseVector(vector.toArray)
}

private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
val packetLength = bytes.length
if (packetLength < 9) {
Expand Down Expand Up @@ -107,7 +156,7 @@ class PythonMLLibAPI extends Serializable {
val bytes = new Array[Byte](9 + 8 * rows * cols)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.put(2: Byte)
bb.put(DENSE_MATRIX_MAGIC)
bb.putInt(rows)
bb.putInt(cols)
val db = bb.asDoubleBuffer()
Expand All @@ -118,17 +167,17 @@ class PythonMLLibAPI extends Serializable {
}

private def trainRegressionModel(
trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
dataBytesJRDD: JavaRDD[Array[Byte]],
initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
val x = deserializeDoubleVector(xBytes).toArray // TODO: deal with sparse vectors here!
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val initialWeights = deserializeDoubleVector(initialWeightsBA)
val model = trainFunc(data, initialWeights)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.weights.toArray))
ret.add(serializeDoubleVector(model.weights))
ret.add(model.intercept: java.lang.Double)
ret
}
Expand All @@ -149,7 +198,7 @@ class PythonMLLibAPI extends Serializable {
numIterations,
stepSize,
miniBatchFraction,
Vectors.dense(initialWeights)),
initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
Expand All @@ -172,7 +221,7 @@ class PythonMLLibAPI extends Serializable {
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
Expand All @@ -195,7 +244,7 @@ class PythonMLLibAPI extends Serializable {
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
Expand All @@ -218,7 +267,7 @@ class PythonMLLibAPI extends Serializable {
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
Expand All @@ -239,7 +288,7 @@ class PythonMLLibAPI extends Serializable {
numIterations,
stepSize,
miniBatchFraction,
Vectors.dense(initialWeights)),
initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
Expand All @@ -251,13 +300,13 @@ class PythonMLLibAPI extends Serializable {
dataBytesJRDD: JavaRDD[Array[Byte]],
lambda: Double): java.util.List[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
val x = deserializeDoubleVector(xBytes).toArray // TODO: make this efficient for sparse vecs
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val model = NaiveBayes.train(data, lambda)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.labels))
ret.add(serializeDoubleVector(model.pi))
ret.add(serializeDoubleVector(Vectors.dense(model.labels)))
ret.add(serializeDoubleVector(Vectors.dense(model.pi)))
ret.add(serializeDoubleMatrix(model.theta))
ret
}
Expand All @@ -271,7 +320,7 @@ class PythonMLLibAPI extends Serializable {
maxIterations: Int,
runs: Int,
initializationMode: String): java.util.List[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes)))
val data = dataBytesJRDD.rdd.map(deserializeDoubleVector)
val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleMatrix(model.clusterCenters.map(_.toArray)))
Expand Down
94 changes: 75 additions & 19 deletions python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
SPARSE_VECTOR_MAGIC = 2
DENSE_MATRIX_MAGIC = 3

def _deserialize_numpy_array(shape, ba, offset):
def _deserialize_numpy_array(shape, ba, offset, dtype='float64'):
"""
Deserialize a numpy array of float64s from a given offset in
Deserialize a numpy array of the given type from an offset in
bytearray ba, assigning it the given shape.
>>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
Expand All @@ -52,8 +52,11 @@ def _deserialize_numpy_array(shape, ba, offset):
>>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
>>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
True
>>> x = array([1, 2, 3], dtype='int32')
>>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, dtype='int32'))
True
"""
ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64", order='C')
ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C')
return ar.copy()

def _serialize_double_vector(v):
Expand All @@ -64,9 +67,16 @@ def _serialize_double_vector(v):
>>> array_equal(y, array([1.0, 2.0, 3.0]))
True
"""
if type(v) != ndarray:
if type(v) == ndarray:
return _serialize_dense_vector(v)
elif type(v) == SparseVector:
return _serialize_sparse_vector(v)
else:
raise TypeError("_serialize_double_vector called on a %s; "
"wanted ndarray" % type(v))
"wanted ndarray or SparseVector" % type(v))

def _serialize_dense_vector(v):
"""Serialize a dense vector given as a NumPy array."""
if v.ndim != 1:
raise TypeError("_serialize_double_vector called on a %ddarray; "
"wanted a 1darray" % v.ndim)
Expand All @@ -84,28 +94,69 @@ def _serialize_double_vector(v):
arr_mid[...] = v
return ba

def _serialize_sparse_vector(v):
"""Serialize a pyspark.mllib.linalg.SparseVector."""
nonzeros = len(v.indices)
ba = bytearray(9 + 12 * nonzeros)
ba[0] = SPARSE_VECTOR_MAGIC
header = ndarray(shape=[2], buffer=ba, offset=1, dtype="int32")
header[0] = v.size
header[1] = nonzeros
copyto(ndarray(shape=[nonzeros], buffer=ba, offset=9, dtype="int32"), v.indices)
values_offset = 9 + 4 * nonzeros
copyto(ndarray(shape=[nonzeros], buffer=ba, offset=values_offset, dtype="float64"), v.values)
return ba

def _deserialize_double_vector(ba):
"""Deserialize a double vector from a mutually understood format.
>>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
>>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
True
>>> s = SparseVector(4, [1, 3], [3.0, 5.5])
>>> s == _deserialize_double_vector(_serialize_double_vector(s))
True
"""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_vector called on a %s; "
"wanted bytearray" % type(ba))
if len(ba) < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
"which is too short" % len(ba))
if ba[0] != DENSE_VECTOR_MAGIC:
if ba[0] == DENSE_VECTOR_MAGIC:
return _deserialize_dense_vector(ba)
elif ba[0] == SPARSE_VECTOR_MAGIC:
return _deserialize_sparse_vector(ba)
else:
raise TypeError("_deserialize_double_vector called on bytearray "
"with wrong magic")

def _deserialize_dense_vector(ba):
"""Deserialize a dense vector into a numpy array."""
if len(ba) < 5:
raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
"which is too short" % len(ba))
length = ndarray(shape=[1], buffer=ba, offset=1, dtype="int32")[0]
if len(ba) != 8*length + 5:
raise TypeError("_deserialize_double_vector called on bytearray "
if len(ba) != 8 * length + 5:
raise TypeError("_deserialize_dense_vector called on bytearray "
"with wrong length")
return _deserialize_numpy_array([length], ba, 5)

def _deserialize_sparse_vector(ba):
"""Deserialize a sparse vector into a MLlib SparseVector object."""
if len(ba) < 9:
raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
"which is too short" % len(ba))
header = ndarray(shape=[2], buffer=ba, offset=1, dtype="int32")
size = header[0]
nonzeros = header[1]
if len(ba) != 9 + 12 * nonzeros:
raise TypeError("_deserialize_sparse_vector called on bytearray "
"with wrong length")
indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype='int32')
values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype='float64')
return SparseVector(int(size), indices, values)

def _serialize_double_matrix(m):
"""Serialize a double matrix into a mutually understood format."""
if (type(m) == ndarray and m.ndim == 2):
Expand Down Expand Up @@ -152,13 +203,15 @@ def _linear_predictor_typecheck(x, coeffs):
This is a temporary hackaround until I actually implement bulk predict."""
if type(x) == ndarray:
if x.ndim == 1:
if x.shape == coeffs.shape:
pass
else:
if x.shape != coeffs.shape:
raise RuntimeError("Got array of %d elements; wanted %d"
% (numpy.shape(x)[0], numpy.shape(coeffs)[0]))
% (numpy.shape(x)[0], coeffs.shape[0]))
else:
raise RuntimeError("Bulk predict not yet supported.")
elif type(x) == SparseVector:
if x.size != coeffs.shape[0]:
raise RuntimeError("Got sparse vector of size %d; wanted %d"
% (x.size, coeffs.shape[0]))
elif (type(x) == RDD):
raise RuntimeError("Bulk predict not yet supported.")
else:
Expand Down Expand Up @@ -192,20 +245,23 @@ def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
return numpy.dot(self._coeff, x) + self._intercept
return x.dot(self._coeff) + self._intercept

# If we weren't given initial weights, take a zero vector of the appropriate
# length.
def _get_initial_weights(initial_weights, data):
if initial_weights is None:
initial_weights = data.first()
if type(initial_weights) != ndarray:
if type(initial_weights) == ndarray:
if initial_weights.ndim != 1:
raise TypeError("At least one data element has "
+ initial_weights.ndim + " dimensions, which is not 1")
initial_weights = numpy.ones([initial_weights.shape[0] - 1])
elif type(initial_weights) == SparseVector:
initial_weights = numpy.ones(initial_weights.size - 1)
else:
raise TypeError("At least one data element has type "
+ type(initial_weights).__name__ + " which is not ndarray")
if initial_weights.ndim != 1:
raise TypeError("At least one data element has "
+ initial_weights.ndim + " dimensions, which is not 1")
initial_weights = numpy.ones([initial_weights.shape[0] - 1])
+ type(initial_weights).__name__ + " which is not a vector")
return initial_weights

# train_func should take two parameters, namely data and initial_weights, and
Expand Down
Loading

0 comments on commit 2abbb44

Please sign in to comment.