diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 92e343fbcba3e..70120b9d0192c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -95,7 +95,7 @@ class LDA private ( * - The 50/k is common in LDA libraries. * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. * - For Online: default = (1.0 / k). - * - follows the implementation from: https://github.com/Blei-Lab/onlineldavb. + * - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]]. * * Note: For EM optimizer, This value should be > 1.0. */ @@ -117,7 +117,8 @@ class LDA private ( * This is the parameter to a symmetric Dirichlet distribution. * * Note: The topics' distributions over terms are called "beta" in the original LDA paper - * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. + * by Blei et al., but are ca + * lled "phi" in many later papers such as Asuncion et al., 2009. */ def getTopicConcentration: Double = this.topicConcentration @@ -138,7 +139,7 @@ class LDA private ( * - The 0.1 gives a small amount of smoothing. * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. * - For Online: default = (1.0 / k). - * - follows the implementation from: https://github.com/Blei-Lab/onlineldavb. + * - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]]. * * Note: For EM optimizer, This value should be > 1.0. */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 5b6d226cea2b3..6d2d93a525a9c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -94,36 +94,21 @@ class EMLDAOptimizer extends LDAOptimizer { /** * Compute bipartite term/doc graph. */ - private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): - LDAOptimizer = { + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={ val docConcentration = lda.getDocConcentration val topicConcentration = lda.getTopicConcentration val k = lda.getK - /** - * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions), - * but values in (0,1) are not yet supported. - */ + // Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions), + // but values in (0,1) are not yet supported. require(docConcentration > 1.0 || docConcentration == -1.0, s"LDA docConcentration must be" + s" > 1.0 (or -1 for auto) for EM Optimizer, but was set to $docConcentration") require(topicConcentration > 1.0 || topicConcentration == -1.0, s"LDA topicConcentration " + s"must be > 1.0 (or -1 for auto) for EM Optimizer, but was set to $topicConcentration") - /** - * - For EM: default = (50 / k) + 1. - * - The 50/k is common in LDA libraries. - * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. - */ this.docConcentration = if (docConcentration == -1) (50.0 / k) + 1.0 else docConcentration - - /** - * - For EM: default = 0.1 + 1. - * - The 0.1 gives a small amount of smoothing. - * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. - */ this.topicConcentration = if (topicConcentration == -1) 1.1 else topicConcentration - val randomSeed = lda.getSeed // For each document, create an edge (Document -> Term) for each unique term in the document. @@ -230,8 +215,8 @@ class EMLDAOptimizer extends LDAOptimizer { /** * :: Experimental :: * - * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which - * processes a subset of the corpus by each call to next, and update the term-topic + * An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA + * algorithm, which processes a subset of the corpus on each iteration, and update the term-topic * distribution adaptively. * * References: @@ -242,16 +227,16 @@ class OnlineLDAOptimizer extends LDAOptimizer { // LDA common parameters private var k: Int = 0 - private var D: Int = 0 + private var corpusSize: Long = 0 private var vocabSize: Int = 0 private var alpha: Double = 0 private var eta: Double = 0 - private var randomSeed: Long = 0 + private var randomGenerator: java.util.Random = null // Online LDA specific parameters - private var tau_0: Double = -1 - private var kappa: Double = -1 - private var batchSize: Int = -1 + private var tau_0: Double = 1024 + private var kappa: Double = 0.51 + private var minibatchFraction: Double = 0.01 // internal data structure private var docs: RDD[(Long, Vector)] = null @@ -259,22 +244,18 @@ class OnlineLDAOptimizer extends LDAOptimizer { private var Elogbeta: BDM[Double]= null private var expElogbeta: BDM[Double] = null - // count of invocation to next, used to help deciding the weight for each iteration + // count of invocation to next, which helps deciding the weight for each iteration private var iteration = 0 /** - * A (positive) learning parameter that downweights early iterations + * A (positive) learning parameter that downweights early iterations. Larger values make early + * iterations count less */ - def getTau_0: Double = { - if (this.tau_0 == -1) { - 1024 - } else { - this.tau_0 - } - } + def getTau_0: Double = this.tau_0 /** - * A (positive) learning parameter that downweights early iterations + * A (positive) learning parameter that downweights early iterations. Larger values make early + * iterations count less * Automatic setting of parameter: * - default = 1024, which follows the recommendation from OnlineLDA paper. */ @@ -287,18 +268,12 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Learning rate: exponential decay rate */ - def getKappa: Double = { - if (this.kappa == -1) { - 0.5 - } else { - this.kappa - } - } + def getKappa: Double = this.kappa /** * Learning rate: exponential decay rate---should be between * (0.5, 1.0] to guarantee asymptotic convergence. - * - default = 0.5, which follows the recommendation from OnlineLDA paper. + * - default = 0.51, which follows the recommendation from OnlineLDA paper. */ def setKappa(kappa: Double): this.type = { require(kappa >= 0 || kappa == -1.0, @@ -310,52 +285,44 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Mini-batch size, which controls how many documents are used in each iteration */ - def getBatchSize: Int = { - if (this.batchSize == -1) { - D / 100 - } else { - this.batchSize - } - } + def getMiniBatchFraction: Double = this.minibatchFraction /** * Mini-batch size, which controls how many documents are used in each iteration * default = 1% from total documents. */ - def setBatchSize(batchSize: Int): this.type = { - this.batchSize = batchSize + def setMiniBatchFraction(miniBatchFraction: Double): this.type = { + this.minibatchFraction = miniBatchFraction this } - private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={ this.k = lda.getK - this.D = docs.count().toInt + this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration - this.randomSeed = randomSeed + this.randomGenerator = new Random(lda.getSeed) this.docs = docs // Initialize the variational distribution q(beta|lambda) this.lambda = getGammaMatrix(k, vocabSize) - this.Elogbeta = dirichlet_expectation(lambda) + this.Elogbeta = dirichletExpectation(lambda) this.expElogbeta = exp(Elogbeta) this.iteration = 0 this } /** - * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model, - * and it will update the topic distribution adaptively for the terms appearing in the subset. - * - * @return Inferred LDA model + * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA + * model, and it will update the topic distribution adaptively for the terms appearing in the + * subset. */ private[clustering] override def next(): OnlineLDAOptimizer = { iteration += 1 - val batchSize = getBatchSize - val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache() + val batch = docs.sample(true, minibatchFraction, randomGenerator.nextLong()) if(batch.isEmpty()) return this val k = this.k @@ -406,8 +373,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { }) val batchResult = stats.reduce(_ += _) - update(batchResult, iteration, batchSize) - batch.unpersist() + update(batchResult, iteration, (minibatchFraction * corpusSize).toInt) this } @@ -415,6 +381,9 @@ class OnlineLDAOptimizer extends LDAOptimizer { new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) } + /** + * Update lambda based on the batch submitted. batchSize can be different for each iteration. + */ private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ val tau_0 = this.getTau_0 @@ -427,18 +396,26 @@ class OnlineLDAOptimizer extends LDAOptimizer { val stat = raw :* expElogbeta // Update lambda based on documents. - lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + eta) * weight - Elogbeta = dirichlet_expectation(lambda) + lambda = lambda * (1 - weight) + + (stat * (corpusSize.toDouble / batchSize.toDouble) + eta) * weight + Elogbeta = dirichletExpectation(lambda) expElogbeta = exp(Elogbeta) } + /** + * Get a random matrix to initialize lambda + */ private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) val temp = gammaRandomGenerator.sample(row * col).toArray (new BDM[Double](col, row, temp)).t } - private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { + /** + * For theta ~ Dir(alpha), computes E[log(theta)] given alpha. Currently the implementation + * uses digamma which is accurate but expensive. + */ + private def dirichletExpectation(alpha : BDM[Double]): BDM[Double] = { val rowSum = sum(alpha(breeze.linalg.*, ::)) val digAlpha = digamma(alpha) val digRowSum = digamma(rowSum)