Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6259][MLlib] Python API for LDA #6791

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,39 @@ private[python] class PythonMLLibAPI extends Serializable {
new MatrixFactorizationModelWrapper(model)
}

/**
* Java stub for Python mllib LDA.run()
*/
def trainLDAModel(
data: JavaRDD[java.util.List[Any]],
k: Int,
maxIterations: Int,
docConcentration: Double,
topicConcentration: Double,
seed: java.lang.Long,
checkpointInterval: Int,
optimizer: String): LDAModel = {
val algo = new LDA()
.setK(k)
.setMaxIterations(maxIterations)
.setDocConcentration(docConcentration)
.setTopicConcentration(topicConcentration)
.setCheckpointInterval(checkpointInterval)
.setOptimizer(optimizer)

if (seed != null) algo.setSeed(seed)

val documents = data.rdd.map(_.asScala.toArray).map { r =>
r(0) match {
case i: java.lang.Integer => (i.toLong, r(1).asInstanceOf[Vector])
case i: java.lang.Long => (i.toLong, r(1).asInstanceOf[Vector])
case _ => throw new IllegalArgumentException("input values contains invalid type value.")
}
}
algo.run(documents)
}


/**
* Java stub for Python mllib FPGrowth.train(). This stub returns a handle
* to the Java object instead of the content of the Java object. Extra care
Expand Down
66 changes: 65 additions & 1 deletion python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat.distribution import MultivariateGaussian
from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
from pyspark.streaming import DStream

__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
'PowerIterationClusteringModel', 'PowerIterationClustering',
'StreamingKMeans', 'StreamingKMeansModel']
'StreamingKMeans', 'StreamingKMeansModel',
'LDA', 'LDAModel']


@inherit_doc
Expand Down Expand Up @@ -563,6 +565,68 @@ def predictOnValues(self, dstream):
return dstream.mapValues(lambda x: self._model.predict(x))


class LDAModel(JavaModelWrapper):

""" A clustering model derived from the LDA method.

Latent Dirichlet Allocation (LDA), a topic model designed for text documents.
Terminology
- "word" = "term": an element of the vocabulary
- "token": instance of a term appearing in a document
- "topic": multinomial distribution over words representing some concept
References:
- Original LDA paper (journal version):
Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003.

>>> from pyspark.mllib.linalg import Vectors
>>> from numpy.testing import assert_almost_equal
>>> data = [
... [1, Vectors.dense([0.0, 1.0])],
... [2, SparseVector(2, {0: 1.0})],
... ]
>>> rdd = sc.parallelize(data)
>>> model = LDA.train(rdd, k=2)
>>> model.vocabSize()
2
>>> topics = model.topicsMatrix()
>>> topics_expect = array([[0.5, 0.5], [0.5, 0.5]])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer you print out the topics matrix and use ... to allow small errors in the values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use ... to compare an array or a list with another one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question; I'm not sure. Feel free to leave it as is, if it does not work easily.

>>> assert_almost_equal(topics, topics_expect, 1)
"""

def topicsMatrix(self):
"""Inferred topics, where each topic is represented by a distribution over terms."""
return self.call("topicsMatrix").toArray()

def vocabSize(self):
"""Vocabulary size (number of terms or terms in the vocabulary)"""
return self.call("vocabSize")


class LDA(object):

@classmethod
def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0,
topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"):
"""Train a LDA model.

:param rdd: RDD of data points
:param k: Number of clusters you want
:param maxIterations: Number of iterations. Default to 20
:param docConcentration: Concentration parameter (commonly named "alpha")
for the prior placed on documents' distributions over topics ("theta").
:param topicConcentration: Concentration parameter (commonly named "beta" or "eta")
for the prior placed on topics' distributions over terms.
:param seed: Random Seed
:param checkpointInterval: Period (in iterations) between checkpoints.
:param optimizer: LDAOptimizer used to perform the actual calculation.
Currently "em", "online" are supported. Default to "em".
"""
model = callMLlibFunc("trainLDAModel", rdd, k, maxIterations,
docConcentration, topicConcentration, seed,
checkpointInterval, optimizer)
return LDAModel(model)


def _test():
import doctest
import pyspark.mllib.clustering
Expand Down