Skip to content

Commit

Permalink
[SPARK-6258] [MLLIB] GaussianMixture Python API parity check
Browse files Browse the repository at this point in the history
Implement Python API for major disparities of GaussianMixture cluster algorithm between Scala & Python
```scala
GaussianMixture
    setInitialModel
GaussianMixtureModel
    k
```

Author: Yanbo Liang <[email protected]>

Closes #6087 from yanboliang/spark-6258 and squashes the following commits:

b3af21c [Yanbo Liang] fix typo
2b645c1 [Yanbo Liang] fix doc
638b4b7 [Yanbo Liang] address comments
b5bcade [Yanbo Liang] GaussianMixture Python API parity check
  • Loading branch information
yanboliang authored and jkbradley committed May 15, 2015
1 parent cf842d4 commit 9476148
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,28 +345,40 @@ private[python] class PythonMLLibAPI extends Serializable {
* Returns a list containing weights, mean and covariance of each mixture component.
*/
def trainGaussianMixture(
data: JavaRDD[Vector],
k: Int,
convergenceTol: Double,
data: JavaRDD[Vector],
k: Int,
convergenceTol: Double,
maxIterations: Int,
seed: java.lang.Long): JList[Object] = {
seed: java.lang.Long,
initialModelWeights: java.util.ArrayList[Double],
initialModelMu: java.util.ArrayList[Vector],
initialModelSigma: java.util.ArrayList[Matrix]): JList[Object] = {
val gmmAlg = new GaussianMixture()
.setK(k)
.setConvergenceTol(convergenceTol)
.setMaxIterations(maxIterations)

if (initialModelWeights != null && initialModelMu != null && initialModelSigma != null) {
val gaussians = initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map {
case (x, y) => new MultivariateGaussian(x.asInstanceOf[Vector], y.asInstanceOf[Matrix])
}
val initialModel = new GaussianMixtureModel(
initialModelWeights.asScala.toArray, gaussians.toArray)
gmmAlg.setInitialModel(initialModel)
}

if (seed != null) gmmAlg.setSeed(seed)

try {
val model = gmmAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))
var wt = ArrayBuffer.empty[Double]
var mu = ArrayBuffer.empty[Vector]
var mu = ArrayBuffer.empty[Vector]
var sigma = ArrayBuffer.empty[Matrix]
for (i <- 0 until model.k) {
wt += model.weights(i)
mu += model.gaussians(i).mu
sigma += model.gaussians(i).sigma
}
}
List(Vectors.dense(wt.toArray), mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava
} finally {
data.rdd.unpersist(blocking = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ import org.apache.spark.sql.{SQLContext, Row}
* are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are
* the respective mean and covariance for each Gaussian distribution i=1..k.
*
* @param weight Weights for each Gaussian distribution in the mixture, where weight(i) is
* the weight for Gaussian i, and weight.sum == 1
* @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i
* @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the
* covariance matrix for Gaussian i
* @param weights Weights for each Gaussian distribution in the mixture, where weights(i) is
* the weight for Gaussian i, and weights.sum == 1
* @param gaussians Array of MultivariateGaussian where gaussians(i) represents
* the Multivariate Gaussian (Normal) Distribution for Gaussian i
*/
@Experimental
class GaussianMixtureModel(
Expand Down
67 changes: 53 additions & 14 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class GaussianMixtureModel(object):

"""A clustering model derived from the Gaussian Mixture Model method.
>>> from pyspark.mllib.linalg import Vectors, DenseMatrix
>>> clusterdata_1 = sc.parallelize(array([-0.1,-0.05,-0.01,-0.1,
... 0.9,0.8,0.75,0.935,
... -0.83,-0.68,-0.91,-0.76 ]).reshape(6, 2))
Expand All @@ -154,24 +155,51 @@ class GaussianMixtureModel(object):
True
>>> labels[4]==labels[5]
True
>>> clusterdata_2 = sc.parallelize(array([-5.1971, -2.5359, -3.8220,
... -5.2211, -5.0602, 4.7118,
... 6.8989, 3.4592, 4.6322,
... 5.7048, 4.6567, 5.5026,
... 4.5605, 5.2043, 6.2734]).reshape(5, 3))
>>> data = array([-5.1971, -2.5359, -3.8220,
... -5.2211, -5.0602, 4.7118,
... 6.8989, 3.4592, 4.6322,
... 5.7048, 4.6567, 5.5026,
... 4.5605, 5.2043, 6.2734])
>>> clusterdata_2 = sc.parallelize(data.reshape(5,3))
>>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
... maxIterations=150, seed=10)
>>> labels = model.predict(clusterdata_2).collect()
>>> labels[0]==labels[1]==labels[2]
True
>>> labels[3]==labels[4]
True
>>> clusterdata_3 = sc.parallelize(data.reshape(15, 1))
>>> im = GaussianMixtureModel([0.5, 0.5],
... [MultivariateGaussian(Vectors.dense([-1.0]), DenseMatrix(1, 1, [1.0])),
... MultivariateGaussian(Vectors.dense([1.0]), DenseMatrix(1, 1, [1.0]))])
>>> model = GaussianMixture.train(clusterdata_3, 2, initialModel=im)
"""

def __init__(self, weights, gaussians):
self.weights = weights
self.gaussians = gaussians
self.k = len(self.weights)
self._weights = weights
self._gaussians = gaussians
self._k = len(self._weights)

@property
def weights(self):
"""
Weights for each Gaussian distribution in the mixture, where weights[i] is
the weight for Gaussian i, and weights.sum == 1.
"""
return self._weights

@property
def gaussians(self):
"""
Array of MultivariateGaussian where gaussians[i] represents
the Multivariate Gaussian (Normal) Distribution for Gaussian i.
"""
return self._gaussians

@property
def k(self):
"""Number of gaussians in mixture."""
return self._k

def predict(self, x):
"""
Expand All @@ -193,9 +221,9 @@ def predictSoft(self, x):
:return: membership_matrix. RDD of array of double values.
"""
if isinstance(x, RDD):
means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
means, sigmas = zip(*[(g.mu, g.sigma) for g in self._gaussians])
membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
_convert_to_vector(self.weights), means, sigmas)
_convert_to_vector(self._weights), means, sigmas)
return membership_matrix.map(lambda x: pyarray.array('d', x))


Expand All @@ -208,13 +236,24 @@ class GaussianMixture(object):
:param convergenceTol: Threshold value to check the convergence criteria. Defaults to 1e-3
:param maxIterations: Number of iterations. Default to 100
:param seed: Random Seed
:param initialModel: GaussianMixtureModel for initializing learning
"""
@classmethod
def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None):
def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None):
"""Train a Gaussian Mixture clustering model."""
weight, mu, sigma = callMLlibFunc("trainGaussianMixture",
rdd.map(_convert_to_vector), k,
convergenceTol, maxIterations, seed)
initialModelWeights = None
initialModelMu = None
initialModelSigma = None
if initialModel is not None:
if initialModel.k != k:
raise Exception("Mismatched cluster count, initialModel.k = %s, however k = %s"
% (initialModel.k, k))
initialModelWeights = initialModel.weights
initialModelMu = [initialModel.gaussians[i].mu for i in range(initialModel.k)]
initialModelSigma = [initialModel.gaussians[i].sigma for i in range(initialModel.k)]
weight, mu, sigma = callMLlibFunc("trainGaussianMixture", rdd.map(_convert_to_vector), k,
convergenceTol, maxIterations, seed, initialModelWeights,
initialModelMu, initialModelSigma)
mvg_obj = [MultivariateGaussian(mu[i], sigma[i]) for i in range(k)]
return GaussianMixtureModel(weight, mvg_obj)

Expand Down

0 comments on commit 9476148

Please sign in to comment.