Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removed LabeledPoint from Python LDA internals #3

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,36 @@ private[python] class PythonMLLibAPI extends Serializable {
new MatrixFactorizationModelWrapper(model)
}

/**
* Java stub for Python mllib LDA.run()
*/
def trainLDAModel(
data: JavaRDD[Object],
k: Int,
maxIterations: Int,
docConcentration: Double,
topicConcentration: Double,
seed: java.lang.Long,
checkpointInterval: Int,
optimizer: String): LDAModel = {
val algo = new LDA()
.setK(k)
.setMaxIterations(maxIterations)
.setDocConcentration(docConcentration)
.setTopicConcentration(topicConcentration)
.setCheckpointInterval(checkpointInterval)
.setOptimizer(optimizer)

if (seed != null) algo.setSeed(seed)

val docs: RDD[(Long, Vector)] = data.rdd.map { x: Object =>
val doc = x.asInstanceOf[java.util.List[Object]]
(doc.get(0).asInstanceOf[Long], doc.get(1).asInstanceOf[Vector])
}
algo.run(docs)
}


/**
* Java stub for Python mllib FPGrowth.train(). This stub returns a handle
* to the Java object instead of the content of the Java object. Extra care
Expand Down
73 changes: 71 additions & 2 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@

from pyspark import RDD
from pyspark import SparkContext
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.mllib.stat.distribution import MultivariateGaussian
from pyspark.mllib.util import Saveable, Loader, inherit_doc

__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture']
__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
'LDA', 'LDAModel']


@inherit_doc
Expand Down Expand Up @@ -274,5 +275,73 @@ def _test():
exit(-1)


class LDAModel(JavaModelWrapper):

""" A clustering model derived from the LDA method.

Latent Dirichlet Allocation (LDA), a topic model designed for text documents.
Terminology
- "word" = "term": an element of the vocabulary
- "token": instance of a term appearing in a document
- "topic": multinomial distribution over words representing some concept
References:
- Original LDA paper (journal version):
Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003.

>>> from numpy.testing import assert_almost_equal
>>> data = [
... (1L, [0.0, 1.0]),
... (2L, [1.0, 0.0]),
... ]
>>> rdd = sc.parallelize(data)
>>> model = LDA.train(rdd, k=2)
>>> model.vocabSize()
2
>>> topics = model.topicsMatrix()
>>> topics_expect = array([[0.5, 0.5], [0.5, 0.5]])
>>> assert_almost_equal(topics, topics_expect, 1)
"""

def topicsMatrix(self):
"""Inferred topics, where each topic is represented by a distribution over terms."""
return self.call("topicsMatrix").toArray()

def vocabSize(self):
"""Vocabulary size (number of terms or terms in the vocabulary)"""
return self.call("vocabSize")

def describeTopics(self, maxTermsPerTopic=None):
"""Return the topics described by weighted terms.

TODO:
Implementing this method is a little hard. Since Scala's return value consistes of tuples.
"""
raise NotImplementedError("LDAModel.describeTopics() in Python must be implemented.")


class LDA():

@classmethod
def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0,
topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"):
"""
Train a LDA model.
:param rdd: Dataset of (doc ID, word count vector) pairs
:param k: PLEASE FILL IN OTHER PARAM DOC
:param maxIterations:
:param docConcentration:
:param topicConcentration:
:param seed:
:param checkpointInterval:
:param optimizer:
:return:
"""
vectorizedData = rdd.map(lambda id_doc: [id_doc[0], _convert_to_vector(id_doc[1])])
model = callMLlibFunc("trainLDAModel", vectorizedData, k, maxIterations,
docConcentration, topicConcentration, seed,
checkpointInterval, optimizer)
return LDAModel(model)


if __name__ == "__main__":
_test()