diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/BinaryClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/BinaryClassificationModel.scala new file mode 100644 index 0000000000000..c87b83b606632 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/BinaryClassificationModel.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.regression._ +import org.apache.spark.rdd.RDD + +trait BinaryClassificationModel extends ClassificationModel { + /** + * Return true labels and prediction scores in an RDD + * + * @param input RDD with labelled points to use for the evaluation + * @return RDD[(Double, Double)] Contains a pair of (label, probability) + * where probability is the probability the model assigns to + * the label being 1. + */ + def scoreForEval(input: RDD[LabeledPoint]) : RDD[(Double, Double)] = { + val predictionAndLabel = input.map { point => + val scores = score(point.features) + (scores, point.label) + } + predictionAndLabel + } + + /** + * Evaluate the performance of the model using the score assigned by the model + * to observations and the true label. + * Returns the Receiver operating characteristic area under the curve. + * Note that we consider the prediction of a label to be 0 if the score is less than 0, + * and we predict label 1 if the score is larger than 0. + * + * @param predictionAndLabel RDD with (score by model, true label) + * @return Double Area under curve of ROC + */ + def areaUnderROC(predictionAndLabel: RDD[(Double, Double)]) : Double = { + val nObs = predictionAndLabel.count.toDouble + val nPos = predictionAndLabel.filter(x => x._2 > 0.5).count.toDouble + // sort according to the predicted score and add indices + val sortedPredictionsWithIndex = predictionAndLabel.sortByKey(true).zipWithIndex + // sum of the positive ranks + val sumPosRanks = sortedPredictionsWithIndex.filter(x => (x._1)._2 > 0).map(x => x._2 + 1).sum + // if there are no positive or no negative labels, the area under the curve is not defined. + // Return 0 in that case. + if ((nPos > 0) && (nObs > nPos)) { + (sumPosRanks - nPos * (nPos + 1) / 2) / (nPos * (nObs - nPos)) + } else { + 0.0 + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala index 391f5b9b7a7de..8a43101ca0c46 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala @@ -35,4 +35,20 @@ trait ClassificationModel extends Serializable { * @return Int prediction from the trained model */ def predict(testData: Array[Double]): Double + + /** + * Score values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return RDD[Double] where each entry contains the corresponding prediction + */ + def score(testData: RDD[Array[Double]]): RDD[Double] + + /** + * Score values for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return Double prediction from the trained model + */ + def score(testData: Array[Double]): Double } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index a481f522761e2..ad75198851e61 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -20,6 +20,8 @@ package org.apache.spark.mllib.classification import scala.math.round import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ + import org.apache.spark.rdd.RDD import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.regression._ @@ -38,12 +40,60 @@ class LogisticRegressionModel( override val weights: Array[Double], override val intercept: Double) extends GeneralizedLinearModel(weights, intercept) - with ClassificationModel with Serializable { + with BinaryClassificationModel with Serializable { + + /** + * Predict probabilties for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return Double prediction from the trained model + */ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, intercept: Double) = { - val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept - round(1.0/ (1.0 + math.exp(margin * -1))) + if (predictScore(dataMatrix, weightMatrix, intercept) < 0) 0.0 else 1.0 + } + + /** + * Returns true label and predicted score using the model trained. + * + * @param labeledData array representing an array of labelled data + * @return Double with score of linear model. One can obtain probabilities by + * applying the logistic (or sigmoid) function + */ + def predictScore(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + // margin + dataMatrix.mmul(weightMatrix).get(0) + intercept + } + + /** + * Score values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return RDD[Double] where each entry contains the corresponding prediction + */ + def score(testData: RDD[Array[Double]]): RDD[Double] = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept + + testData.map { x => + val dataMatrix = new DoubleMatrix(1, x.length, x:_*) + predictScore(dataMatrix, localWeights, localIntercept) + } + } + + /** + * Predict values for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return Double prediction from the trained model + */ + def score(testData: Array[Double]): Double = { + val dataMat = new DoubleMatrix(1, testData.length, testData:_*) + predictScore(dataMat, weightsMatrix, intercept) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 6539b2f339465..90bf1dbe10c58 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -46,6 +46,11 @@ class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]]) val result = _pi.add(_theta.mmul(dataMatrix)) result.argmax() } + + /// same as predict in this case + def score(testData: RDD[Array[Double]]): RDD[Double] = testData.map(score) + + def score(testData: Array[Double]): Double = predict(testData) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala index 6dff29dfb45cc..4f36137420904 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala @@ -36,13 +36,48 @@ class SVMModel( override val weights: Array[Double], override val intercept: Double) extends GeneralizedLinearModel(weights, intercept) - with ClassificationModel with Serializable { + with BinaryClassificationModel with Serializable { override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, intercept: Double) = { - val margin = dataMatrix.dot(weightMatrix) + intercept + val margin = predictScore(dataMatrix, weightMatrix, intercept) if (margin < 0) 0.0 else 1.0 } + + /// + def predictScore(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + dataMatrix.dot(weightMatrix) + intercept + } + + /** + * Score values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return RDD[Double] where each entry contains the corresponding score + */ + def score(testData: RDD[Array[Double]]): RDD[Double] = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept + + testData.map { x => + val dataMatrix = new DoubleMatrix(1, x.length, x:_*) + predictScore(dataMatrix, localWeights, localIntercept) + } + } + + /** + * Score values for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return Double score from the trained model + */ + def score(testData: Array[Double]): Double = { + val dataMat = new DoubleMatrix(1, testData.length, testData:_*) + predictScore(dataMat, weightsMatrix, intercept) + } } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index f98b0b536deaa..2a73ff3e49f57 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -35,7 +35,7 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: extends Serializable { // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) + protected val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) /** * Predict the result given a data point and the weights learned. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/BinaryClassificationEvaluationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/BinaryClassificationEvaluationSuite.scala new file mode 100644 index 0000000000000..bd5ce9dd3f05f --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/BinaryClassificationEvaluationSuite.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.math.abs +import scala.util.Random +import scala.collection.JavaConversions._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.LocalSparkContext + +class BinaryClassificationEvaluationSuite extends FunSuite with LocalSparkContext + with ShouldMatchers { + def validateResult(estVal: Double, trueVal: Double, tol: Double) { + abs(estVal - trueVal) should be < tol + } + + // Test ROC area under the curve using synthetic output of a model + test("ROC area under curve, synthetic, LR") { + val predictionAndLabelC = sc.parallelize(Array((3.0, 1.0), (-2.0, 0.0), (2.0, 1.0), (-1.0, 0.0), + (1.0, 1.0))) + val modelC = new LogisticRegressionModel(Array(0.0), 0.0) + val aucRocC = modelC.areaUnderROC(predictionAndLabelC) + validateResult(aucRocC, 1.0, 0.01) + + val predictionAndLabelR = sc.parallelize(Array((0.45, 1.0), (-0.23, 0.0), (-0.34, 1.0), + (-0.42, 0.0), (0.62, 1.0))) + val modelR = new LogisticRegressionModel(Array(0.0), 0.0) + val aucRocR = modelR.areaUnderROC(predictionAndLabelR) + validateResult(aucRocR, 0.8333, 0.01) + } + + // Test ROC area under the curve using a small data set and svm + test("ROC area under curve, sythentic, SVM") { + val predictionAndLabelC = sc.parallelize(Array((3.0, 1.0), (-2.0, 0.0), (2.0, 1.0), (-1.0, 0.0), + (1.0, 1.0))) + val modelC = new SVMModel(Array(0.0), 0.0) + val aucRocC = modelC.areaUnderROC(predictionAndLabelC) + validateResult(aucRocC, 1.0, 0.01) + + val predictionAndLabelR = sc.parallelize(Array((0.45, 1.0), (-0.23, 0.0), (-0.34, 1.0), + (-0.42, 0.0), (0.62, 1.0))) + val modelR = new SVMModel(Array(0.0), 0.0) + val aucRocR = modelR.areaUnderROC(predictionAndLabelR) + validateResult(aucRocR, 0.8333, 0.01) + } +}