From 34705bc81adf0ec5ec21b714a9ea9eaf15190a1a Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Fri, 1 May 2015 15:18:11 -0700 Subject: [PATCH] first commit --- .../classification/LogisticRegression.scala | 404 ++++++++++++++++-- .../org/apache/spark/mllib/util/MLUtils.scala | 2 +- .../LogisticRegressionSuite.scala | 97 ++++- 3 files changed, 475 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index b73be035e29b5..dbc69c839feab 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -17,23 +17,32 @@ package org.apache.spark.ml.classification +import scala.collection.mutable + +import breeze.linalg.{norm => brzNorm, DenseVector => BDV} +import breeze.optimize.{LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} +import breeze.optimize.{CachedDiffFunction, DiffFunction} + import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ -import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS -import org.apache.spark.mllib.linalg.{BLAS, Vector, VectorUDT, Vectors} +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.BLAS._ +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.{SparkException, Logging} /** * Params for logistic regression. */ private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams - with HasRegParam with HasMaxIter with HasFitIntercept with HasThreshold { - - setDefault(regParam -> 0.1, maxIter -> 100, threshold -> 0.5) -} + with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol + with HasThreshold /** * :: AlphaComponent :: @@ -44,45 +53,151 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas @AlphaComponent class LogisticRegression extends ProbabilisticClassifier[Vector, LogisticRegression, LogisticRegressionModel] - with LogisticRegressionParams { + with LogisticRegressionParams with Logging { - /** @group setParam */ + /** + * Set the regularization parameter. + * Default is 0.0. + * @group setParam + */ def setRegParam(value: Double): this.type = set(regParam, value) + setDefault(regParam -> 0.0) - /** @group setParam */ + /** + * Set the ElasticNet mixing parameter. + * For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. + * For 0 < alpha < 1, the penalty is a combination of L1 and L2. + * Default is 0.0 which is an L2 penalty. + * @group setParam + */ + def setElasticNetParam(value: Double): this.type = set(elasticNetParam, value) + setDefault(elasticNetParam -> 0.0) + + /** + * Set the maximal number of iterations. + * Default is 100. + * @group setParam + */ def setMaxIter(value: Int): this.type = set(maxIter, value) + setDefault(maxIter -> 100) + + /** + * Set the convergence tolerance of iterations. + * Smaller value will lead to higher accuracy with the cost of more iterations. + * Default is 1E-6. + * @group setParam + */ + def setTol(value: Double): this.type = set(tol, value) + setDefault(tol -> 1E-6) /** @group setParam */ def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value) + setDefault(fitIntercept -> true) /** @group setParam */ def setThreshold(value: Double): this.type = set(threshold, value) + setDefault(threshold -> 0.5) override protected def train(dataset: DataFrame): LogisticRegressionModel = { // Extract columns from data. If dataset is persisted, do not persist oldDataset. - val oldDataset = extractLabeledPoints(dataset) + val instances = extractLabeledPoints(dataset).map { + case LabeledPoint(label: Double, features: Vector) => (label, features) + } val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE - if (handlePersistence) { - oldDataset.persist(StorageLevel.MEMORY_AND_DISK) + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + + val (summarizer, labelSummarizer) = instances.treeAggregate( + (new MultivariateOnlineSummarizer, new MultiClassSummarizer))( { + case ((summarizer: MultivariateOnlineSummarizer, labelSummarizer: MultiClassSummarizer), + (label: Double, features: Vector)) => + (summarizer.add(features), labelSummarizer.add(label)) + }, { + case ((summarizer1: MultivariateOnlineSummarizer, labelSummarizer1: MultiClassSummarizer), + (summarizer2: MultivariateOnlineSummarizer, labelSummarizer2: MultiClassSummarizer)) => + (summarizer1.merge(summarizer2), labelSummarizer1.merge(labelSummarizer2)) + }) + + val histogram = labelSummarizer.histogram + val numInvalid = labelSummarizer.countInvalid + val numClasses = histogram.length + val numFeatures = summarizer.mean.size + + if (numInvalid != 0) { + logError("Classification labels should be in {0 to " + (numClasses - 1) + "}. " + + "Found " + numInvalid + " invalid labels.") + throw new SparkException("Input validation failed.") + } + + if (numClasses > 2) { + logError("Currently, LogisticRegression with ElasticNet in ML package only supports " + + "binary classification. Found " + numClasses + " in the input dataset.") + throw new SparkException("Input validation failed.") } - // Train model - val lr = new LogisticRegressionWithLBFGS() - .setIntercept($(fitIntercept)) - lr.optimizer - .setRegParam($(regParam)) - .setNumIterations($(maxIter)) - val oldModel = lr.run(oldDataset) - val lrm = new LogisticRegressionModel(this, oldModel.weights, oldModel.intercept) + val featuresMean = summarizer.mean.toArray + val featuresStd = summarizer.variance.toArray.map(math.sqrt) + + val regParamL1 = $(elasticNetParam) * $(regParam) + val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) - if (handlePersistence) { - oldDataset.unpersist() + val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), + featuresStd, featuresMean, regParamL2) + + val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) { + new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) + } else { + // Remove the L1 penalization on the intercept + def regParamL1Fun = (index: Int) => { + if (index == numFeatures) 0.0 else regParamL1 + } + new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) } - copyValues(lrm) + + val initialWeightsWithIntercept = + Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures) + + // TODO: Compute the initial intercept based on the histogram. + if ($(fitIntercept)) initialWeightsWithIntercept.toArray(numFeatures) = 1.0 + + val states = optimizer.iterations(new CachedDiffFunction(costFun), + initialWeightsWithIntercept.toBreeze.toDenseVector) + + var state = states.next() + val lossHistory = mutable.ArrayBuilder.make[Double] + + while (states.hasNext) { + lossHistory += state.value + state = states.next() + } + lossHistory += state.value + + // The weights are trained in the scaled space; we're converting them back to + // the original space. + val weightsWithIntercept = { + val rawWeights = state.x.toArray.clone() + var i = 0 + // Note that the intercept in scaled space and original space is the same; + // as a result, no scaling is needed. + while (i < numFeatures) { + rawWeights(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 } + i += 1 + } + Vectors.dense(rawWeights) + } + + if (handlePersistence) instances.unpersist() + + val (weights, intercept) = if ($(fitIntercept)) { + (Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)), + weightsWithIntercept(weightsWithIntercept.size - 1)) + } else { + (weightsWithIntercept, 0.0) + } + + new LogisticRegressionModel(this, weights.compressed, intercept) } } - /** * :: AlphaComponent :: * @@ -193,3 +308,244 @@ class LogisticRegressionModel private[ml] ( copyValues(new LogisticRegressionModel(parent, weights, intercept), extra) } } + +/** + * MultiClassSummarizer computes the number of distinct labels and corresponding counts, + * and validates the data to see if the labels used for k class multi-label classification + * are in the range of {0, 1, ..., k - 1} in a online fashion. + * + * Two MultilabelSummarizer can be merged together to have a statistical summary of the + * corresponding joint dataset. + */ +class MultiClassSummarizer private[ml] extends Serializable { + private val distinctMap = new mutable.HashMap[Int, Long] + private var totalInvalidCnt: Long = 0L + + /** + * Add a new label into this MultilabelSummarizer, and update the distinct map. + * @param label The label for this data point. + * @return This MultilabelSummarizer + */ + def add(label: Double): this.type = { + if (label - label.toInt != 0.0 || label < 0) { + totalInvalidCnt += 1 + this + } + else { + val counts: Long = distinctMap.getOrElse(label.toInt, 0L) + distinctMap.put(label.toInt, counts + 1) + this + } + } + + /** + * Merge another MultilabelSummarizer, and update the distinct map. + * (Note that it will merge the smaller distinct map into the larger one using in-place + * merging, so either `this` or `other` object will be modified and returned.) + * + * @param other The other MultilabelSummarizer to be merged. + * @return Merged MultilabelSummarizer object. + */ + def merge(other: MultiClassSummarizer): MultiClassSummarizer = { + val (largeMap, smallMap) = if (this.distinctMap.size > other.distinctMap.size) { + (this, other) + } else { + (other, this) + } + smallMap.distinctMap.foreach { + case (key, value) => + val counts = largeMap.distinctMap.getOrElse(key, 0L) + largeMap.distinctMap.put(key, counts + value) + } + largeMap.totalInvalidCnt += smallMap.totalInvalidCnt + largeMap + } + + def countInvalid: Long = totalInvalidCnt + + def numClasses: Int = distinctMap.keySet.max + 1 + + def histogram: Array[Long] = { + val result = Array.ofDim[Long](numClasses) + var i = 0 + while (i < result.length) { + result(i) = distinctMap.getOrElse(i, 0L) + i += 1 + } + result + } +} + +/** + * :: DeveloperApi :: + * + * @param numClasses the number of possible outcomes for k classes classification problem in + * Multinomial Logistic Regression. By default, it is binary logistic regression + * so numClasses will be set to 2. + */ +private class LogisticAggregator( + weights: Vector, + numClasses: Int, + fitIntercept: Boolean, + featuresStd: Array[Double], + featuresMean: Array[Double]) extends Serializable { + + private var totalCnt: Long = 0L + private var lossSum = 0.0 + + private val weightsArray = weights match { + case dv: DenseVector => dv.values + case _ => + throw new IllegalArgumentException( + s"weights only supports dense vector but got type ${weights.getClass}.") + } + + private val dim = if (fitIntercept) weightsArray.length - 1 else weightsArray.length + + private val gradientSumArray = Array.ofDim[Double](weightsArray.length) + + /** + * Add a new training data to this LogisticAggregator, and update the loss and gradient + * of the objective function. + * + * @param label The label for this data point. + * @param data The features for one data point in dense/sparse vector format to be added + * into this aggregator. + * @return This LogisticAggregator object. + */ + def add(label: Double, data: Vector): this.type = { + require(dim == data.size, s"Dimensions mismatch when adding new sample." + + s" Expecting $dim but got ${data.size}.") + + val dataSize = data.size + + val localWeightsArray = weightsArray + val localGradientSumArray = gradientSumArray + + numClasses match { + case 2 => + /** + * For Binary Logistic Regression. + */ + val margin = - { + var sum = 0.0 + data.foreachActive { (index, value) => + if (featuresStd(index) != 0.0 && value != 0.0) { + sum += localWeightsArray(index) * (value / featuresStd(index)) + } + } + sum + { if (fitIntercept) localWeightsArray(dim) else 0.0 } + } + + val multiplier = (1.0 / (1.0 + math.exp(margin))) - label + + data.foreachActive { (index, value) => + if (featuresStd(index) != 0.0 && value != 0.0) { + localGradientSumArray(index) += multiplier * (value / featuresStd(index)) + } + } + + if (fitIntercept) { + localGradientSumArray(dim) += multiplier + } + + if (label > 0) { + // The following is equivalent to log(1 + exp(margin)) but more numerically stable. + lossSum += MLUtils.log1pExp(margin) + } else { + lossSum += MLUtils.log1pExp(margin) - margin + } + case _ => + new NotImplementedError("LogisticRegression with ElasticNet in ML package only supports " + + "binary classification for now.") + } + totalCnt += 1 + this + } + + /** + * Merge another LogisticAggregator, and update the loss and gradient + * of the objective function. + * (Note that it's in place merging; as a result, `this` object will be modified.) + * + * @param other The other LogisticAggregator to be merged. + * @return This LogisticAggregator object. + */ + def merge(other: LogisticAggregator): this.type = { + require(dim == other.dim, s"Dimensions mismatch when merging with another " + + s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.") + + if (other.totalCnt != 0) { + totalCnt += other.totalCnt + lossSum += other.lossSum + + var i = 0 + val localThisGradientSumArray = this.gradientSumArray + val localOtherGradientSumArray = other.gradientSumArray + while (i < localThisGradientSumArray.length) { + localThisGradientSumArray(i) += localOtherGradientSumArray(i) + i += 1 + } + } + this + } + + def count: Long = totalCnt + + def loss: Double = lossSum / totalCnt + + def gradient: Vector = { + val result = Vectors.dense(gradientSumArray.clone()) + scal(1.0 / totalCnt, result) + result + } +} + +/** + * LogisticCostFun implements Breeze's DiffFunction[T] for a multinomial logistic loss function, + * as used in multi-class classification (it is also used in binary logistic regression). + * It returns the loss and gradient with L2 regularization at a particular point (weights). + * It's used in Breeze's convex optimization routines. + */ +private class LogisticCostFun( + data: RDD[(Double, Vector)], + numClasses: Int, + fitIntercept: Boolean, + featuresStd: Array[Double], + featuresMean: Array[Double], + regParamL2: Double) extends DiffFunction[BDV[Double]] { + + override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { + val w = Vectors.fromBreeze(weights) + + val logisticAggregator = data.treeAggregate(new LogisticAggregator(w, numClasses, fitIntercept, + featuresStd, featuresMean))( + seqOp = (c, v) => (c, v) match { + case (aggregator, (label, features)) => aggregator.add(label, features) + }, + combOp = (c1, c2) => (c1, c2) match { + case (aggregator1, aggregator2) => aggregator1.merge(aggregator2) + }) + + // regVal is the sum of weight squares for L2 regularization + val norm = if (fitIntercept) { + brzNorm(Vectors.dense(weights.toArray.slice(0, weights.size -1)).toBreeze, 2.0) + } else { + brzNorm(weights, 2.0) + } + val regVal = 0.5 * regParamL2 * norm * norm + + val loss = logisticAggregator.loss + regVal + val gradient = logisticAggregator.gradient + + if (fitIntercept) { + axpy(regParamL2, w, gradient) + } else { + val wArray = w.toArray.clone() + wArray(wArray.length - 1) = 0.0 + axpy(regParamL2, Vectors.dense(wArray), gradient) + } + + (loss, gradient.toBreeze.asInstanceOf[BDV[Double]]) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 5d6ddd47f67d6..681f4c618d302 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -331,7 +331,7 @@ object MLUtils { * @param x a floating-point value as input. * @return the result of `math.log(1 + math.exp(x))`. */ - private[mllib] def log1pExp(x: Double): Double = { + private[spark] def log1pExp(x: Double): Double = { if (x > 0) { x + math.log1p(math.exp(-x)) } else { diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 6dd1fdf05514e..2d65646184734 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.classification import org.scalatest.FunSuite -import org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInput +import org.apache.spark.mllib.classification.LogisticRegressionSuite import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -30,13 +30,28 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { @transient var sqlContext: SQLContext = _ @transient var dataset: DataFrame = _ + @transient var binaryDataset: DataFrame = _ private val eps: Double = 1e-5 override def beforeAll(): Unit = { super.beforeAll() sqlContext = new SQLContext(sc) - dataset = sqlContext.createDataFrame( - sc.parallelize(generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42), 2)) + + dataset = sqlContext.createDataFrame(sc.parallelize(LogisticRegressionSuite + .generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42), 4)) + + binaryDataset = { + val nPoints = 10000 + val weights = Array(-0.57997, 0.912083, -0.371077, -0.819866, 2.688191) + val xMean = Array(5.843, 3.057, 3.758, 1.199) + val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) + + val testData = LogisticRegressionSuite.generateMultinomialLogisticInput( + weights, xMean, xVariance, true, nPoints, 42) + + sqlContext.createDataFrame(sc.parallelize(LogisticRegressionSuite + .generateMultinomialLogisticInput(weights, xMean, xVariance, true, nPoints, 42), 4)) + } } test("logistic regression: default params") { @@ -135,4 +150,80 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { assert(pred == predFromProb) } } + + test("MultilabelSummarizer") { + val summarizer1 = (new MultiClassSummarizer) + .add(0.0).add(3.0).add(4.0).add(3.0).add(6.0) + assert(summarizer1.histogram.zip(Array[Long](1, 0, 0, 2, 1, 0, 1)).forall(x => x._1 === x._2)) + assert(summarizer1.countInvalid === 0) + assert(summarizer1.numClasses === 7) + + val summarizer2 = (new MultiClassSummarizer) + .add(1.0).add(5.0).add(3.0).add(0.0).add(4.0).add(1.0) + assert(summarizer2.histogram.zip(Array[Long](1, 2, 0, 1, 1, 1)).forall(x => x._1 === x._2)) + assert(summarizer2.countInvalid === 0) + assert(summarizer2.numClasses === 6) + + val summarizer3 = (new MultiClassSummarizer) + .add(0.0).add(1.3).add(5.2).add(2.5).add(2.0).add(4.0).add(4.0).add(4.0).add(1.0) + assert(summarizer3.histogram.zip(Array[Long](1, 1, 1, 0, 3)).forall(x => x._1 === x._2)) + assert(summarizer3.countInvalid === 3) + assert(summarizer3.numClasses === 5) + + val summarizer4 = (new MultiClassSummarizer) + .add(3.1).add(4.3).add(2.0).add(1.0).add(3.0) + assert(summarizer4.histogram.zip(Array[Long](0, 1, 1, 1)).forall(x => x._1 === x._2)) + assert(summarizer4.countInvalid === 2) + assert(summarizer4.numClasses === 4) + + // small map merges large one + val summarizerA = summarizer1.merge(summarizer2) + assert(summarizerA.hashCode() === summarizer2.hashCode()) + assert(summarizerA.histogram.zip(Array[Long](2, 2, 0, 3, 2, 1, 1)).forall(x => x._1 === x._2)) + assert(summarizerA.countInvalid === 0) + assert(summarizerA.numClasses === 7) + + // large map merges small one + val summarizerB = summarizer3.merge(summarizer4) + assert(summarizerB.hashCode() === summarizer3.hashCode()) + assert(summarizerB.histogram.zip(Array[Long](1, 2, 2, 1, 3)).forall(x => x._1 === x._2)) + assert(summarizerB.countInvalid === 5) + assert(summarizerB.numClasses === 5) + } + + test("binary logistic regression with intercept without regularization") { + val trainer = (new LogisticRegression).setFitIntercept(true).setMaxIter(100).setThreshold(1E-16) + + val model = trainer.fit(binaryDataset) + + val interceptR = 2.83610 + val weightsR = Array(-0.5895, 0.8932, -0.3924, -0.79966) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights(0) ~== weightsR(0) relTol 1E-3) + assert(model.weights(1) ~== weightsR(1) relTol 1E-3) + assert(model.weights(2) ~== weightsR(2) relTol 1E-3) + assert(model.weights(3) ~== weightsR(3) relTol 1E-3) + } + + test("binary logistic regression without intercept without regularization") { + } + + test("binary logistic regression with intercept with L1 regularization") { + } + + test("binary logistic regression without intercept with L1 regularization") { + } + + test("binary logistic regression with intercept with L2 regularization") { + } + + test("binary logistic regression without intercept with L2 regularization") { + } + + test("binary logistic regression with intercept with ElasticNet regularization") { + } + + test("binary logistic regression without intercept with ElasticNet regularization") { + } }