Skip to content

Commit

Permalink
[SPARK-5021] Gaussian Mixture now supports Sparse Input
Browse files Browse the repository at this point in the history
  • Loading branch information
MechCoder committed Feb 10, 2015
1 parent 855d12a commit e180f4c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package org.apache.spark.mllib.clustering

import scala.collection.mutable.IndexedSeq

import breeze.linalg.{DenseMatrix => BreezeMatrix, DenseVector => BreezeVector, Transpose, diag}
import breeze.linalg.{DenseVector => BDV, Vector => BV, SparseVector => BSV,
DenseMatrix => BreezeMatrix, diag, Transpose}

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, Matrices, Vector, Vectors}
import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors, DenseVector,
SparseVector, DenseMatrix, BLAS}
import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -130,7 +132,7 @@ class GaussianMixture private (
val sc = data.sparkContext

// we will operate on the data as breeze data
val breezeData = data.map(u => u.toBreeze.toDenseVector).cache()
val breezeData = data.map(_.toBreeze).cache()

// Get length of the input vectors
val d = breezeData.first().length
Expand All @@ -148,7 +150,7 @@ class GaussianMixture private (
(Array.fill(k)(1.0 / k), Array.tabulate(k) { i =>
val slice = samples.view(i * nSamples, (i + 1) * nSamples)
new MultivariateGaussian(vectorMean(slice), initCovariance(slice))
})
})
}
}

Expand Down Expand Up @@ -185,8 +187,8 @@ class GaussianMixture private (
}

/** Average of dense breeze vectors */
private def vectorMean(x: IndexedSeq[BreezeVector[Double]]): BreezeVector[Double] = {
val v = BreezeVector.zeros[Double](x(0).length)
private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
val v = BDV.zeros[Double](x(0).length)
x.foreach(xi => v += xi)
v / x.length.toDouble
}
Expand All @@ -195,10 +197,10 @@ class GaussianMixture private (
* Construct matrix where diagonal entries are element-wise
* variance of input vectors (computes biased variance)
*/
private def initCovariance(x: IndexedSeq[BreezeVector[Double]]): BreezeMatrix[Double] = {
private def initCovariance(x: IndexedSeq[BV[Double]]): BreezeMatrix[Double] = {
val mu = vectorMean(x)
val ss = BreezeVector.zeros[Double](x(0).length)
x.map(xi => (xi - mu) :^ 2.0).foreach(u => ss += u)
val ss = BDV.zeros[Double](x(0).length)
x.foreach(xi => ss += (xi - mu) :^ 2.0)
diag(ss / x.length.toDouble)
}
}
Expand All @@ -207,28 +209,37 @@ class GaussianMixture private (
private object ExpectationSum {
def zero(k: Int, d: Int): ExpectationSum = {
new ExpectationSum(0.0, Array.fill(k)(0.0),
Array.fill(k)(BreezeVector.zeros(d)), Array.fill(k)(BreezeMatrix.zeros(d,d)))
Array.fill(k)(BDV.zeros(d)), Array.fill(k)(BreezeMatrix.zeros(d,d)))
}

// compute cluster contributions for each input point
// (U, T) => U for aggregation
def add(
weights: Array[Double],
dists: Array[MultivariateGaussian])
(sums: ExpectationSum, x: BreezeVector[Double]): ExpectationSum = {
(sums: ExpectationSum, x: BV[Double]): ExpectationSum = {
val p = weights.zip(dists).map {
case (weight, dist) => MLUtils.EPSILON + weight * dist.pdf(x)
}
val pSum = p.sum
sums.logLikelihood += math.log(pSum)
val xxt = x * new Transpose(x)
var i = 0
val isSparse = x match {
case _: BSV[Double] => true
case _: BDV[Double] => false
}
while (i < sums.k) {
p(i) /= pSum
sums.weights(i) += p(i)
sums.means(i) += x * p(i)
BLAS.syr(p(i), Vectors.fromBreeze(x).asInstanceOf[DenseVector],
Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix])
if (isSparse){
BLAS.syr(p(i), Vectors.fromBreeze(x).asInstanceOf[SparseVector],
Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix])
}
else {
BLAS.syr(p(i), Vectors.fromBreeze(x).asInstanceOf[DenseVector],
Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix])
}
i = i + 1
}
sums
Expand All @@ -239,7 +250,7 @@ private object ExpectationSum {
private class ExpectationSum(
var logLikelihood: Double,
val weights: Array[Double],
val means: Array[BreezeVector[Double]],
val means: Array[BDV[Double]],
val sigmas: Array[BreezeMatrix[Double]]) extends Serializable {

val k = weights.length
Expand Down
14 changes: 14 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,20 @@ private[spark] object BLAS extends Serializable with Logging {
}
}

// Workaround for SparseVectors.
def syr(alpha: Double, x: SparseVector, A: DenseMatrix) {
val mA = A.numRows
val nA = A.numCols
require(mA == nA, s"A is not a symmetric matrix. A: $mA x $nA")
require(mA == x.size, s"The size of x doesn't match the rank of A. A: $mA x $nA, x: ${x.size}")

val zipIndVal = (x.indices zip x.values)
zipIndVal map {
case(ind1, val1) => zipIndVal map {
case(ind2, val2) => A.values(ind1 * nA + ind2) += alpha * val1 * val2}
}
}

/**
* C := alpha * A * B + beta * C
* @param alpha a scalar to scale the multiplication A * B.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.stat.distribution

import breeze.linalg.{DenseVector => DBV, DenseMatrix => DBM, diag, max, eigSym}
import breeze.linalg.{DenseVector => DBV, DenseMatrix => DBM, diag, max, eigSym, Vector => BV}

import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.mllib.linalg.{Vectors, Vector, Matrices, Matrix}
Expand Down Expand Up @@ -62,21 +62,21 @@ class MultivariateGaussian (

/** Returns density of this multivariate Gaussian at given point, x */
def pdf(x: Vector): Double = {
pdf(x.toBreeze.toDenseVector)
pdf(x.toBreeze)
}

/** Returns the log-density of this multivariate Gaussian at given point, x */
def logpdf(x: Vector): Double = {
logpdf(x.toBreeze.toDenseVector)
logpdf(x.toBreeze)
}

/** Returns density of this multivariate Gaussian at given point, x */
private[mllib] def pdf(x: DBV[Double]): Double = {
private[mllib] def pdf(x: BV[Double]): Double = {
math.exp(logpdf(x))
}

/** Returns the log-density of this multivariate Gaussian at given point, x */
private[mllib] def logpdf(x: DBV[Double]): Double = {
private[mllib] def logpdf(x: BV[Double]): Double = {
val delta = x - breezeMu
val v = rootSigmaInv * delta
u + v.t * v * -0.5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class GaussianMixtureSuite extends FunSuite with MLlibTestSparkContext {
Vectors.dense(5.0, 10.0),
Vectors.dense(4.0, 11.0)
))


val sparseData = data.map(point => Vectors.sparse(2, Array(0, 1), point.toArray))
// expectations
val Ew = 1.0
val Emu = Vectors.dense(5.0, 10.0)
Expand All @@ -40,10 +41,15 @@ class GaussianMixtureSuite extends FunSuite with MLlibTestSparkContext {
val seeds = Array(314589, 29032897, 50181, 494821, 4660)
seeds.foreach { seed =>
val gmm = new GaussianMixture().setK(1).setSeed(seed).run(data)
val sparseGMM = new GaussianMixture().setK(1).setSeed(seed).run(sparseData)
assert(gmm.weights(0) ~== Ew absTol 1E-5)
assert(gmm.gaussians(0).mu ~== Emu absTol 1E-5)
assert(gmm.gaussians(0).sigma ~== Esigma absTol 1E-5)
assert(sparseGMM.weights(0) ~== Ew absTol 1E-5)
assert(sparseGMM.gaussians(0).mu ~== Emu absTol 1E-5)
assert(sparseGMM.gaussians(0).sigma ~== Esigma absTol 1E-5)
}

}

test("two clusters") {
Expand All @@ -54,7 +60,8 @@ class GaussianMixtureSuite extends FunSuite with MLlibTestSparkContext {
Vectors.dense( 5.7048), Vectors.dense( 4.6567), Vectors.dense( 5.5026),
Vectors.dense( 4.5605), Vectors.dense( 5.2043), Vectors.dense( 6.2734)
))


val sparseData = data.map(point => Vectors.sparse(1, Array(0), point.toArray))
// we set an initial gaussian to induce expected results
val initialGmm = new GaussianMixtureModel(
Array(0.5, 0.5),
Expand All @@ -72,12 +79,23 @@ class GaussianMixtureSuite extends FunSuite with MLlibTestSparkContext {
.setK(2)
.setInitialModel(initialGmm)
.run(data)


val sparseGMM = new GaussianMixture()
.setK(2)
.setInitialModel(initialGmm)
.run(data)

assert(gmm.weights(0) ~== Ew(0) absTol 1E-3)
assert(gmm.weights(1) ~== Ew(1) absTol 1E-3)
assert(gmm.gaussians(0).mu ~== Emu(0) absTol 1E-3)
assert(gmm.gaussians(1).mu ~== Emu(1) absTol 1E-3)
assert(gmm.gaussians(0).sigma ~== Esigma(0) absTol 1E-3)
assert(gmm.gaussians(1).sigma ~== Esigma(1) absTol 1E-3)
assert(sparseGMM.weights(0) ~== Ew(0) absTol 1E-3)
assert(sparseGMM.weights(1) ~== Ew(1) absTol 1E-3)
assert(sparseGMM.gaussians(0).mu ~== Emu(0) absTol 1E-3)
assert(sparseGMM.gaussians(1).mu ~== Emu(1) absTol 1E-3)
assert(sparseGMM.gaussians(0).sigma ~== Esigma(0) absTol 1E-3)
assert(sparseGMM.gaussians(1).sigma ~== Esigma(1) absTol 1E-3)
}
}

0 comments on commit e180f4c

Please sign in to comment.