Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROC area under the curve for binary classification #160

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.classification


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.regression._
import org.apache.spark.rdd.RDD

trait BinaryClassificationModel extends ClassificationModel {
/**
* Return true labels and prediction scores in an RDD
*
* @param input RDD with labelled points to use for the evaluation
* @return RDD[(Double, Double)] Contains a pair of (label, probability)
* where probability is the probability the model assigns to
* the label being 1.
*/
def scoreForEval(input: RDD[LabeledPoint]) : RDD[(Double, Double)] = {
val predictionAndLabel = input.map { point =>
val scores = score(point.features)
(scores, point.label)
}
predictionAndLabel
}

/**
* Evaluate the performance of the model using the score assigned by the model
* to observations and the true label.
* Returns the Receiver operating characteristic area under the curve.
* Note that we consider the prediction of a label to be 0 if the score is less than 0,
* and we predict label 1 if the score is larger than 0.
*
* @param predictionAndLabel RDD with (score by model, true label)
* @return Double Area under curve of ROC
*/
def areaUnderROC(predictionAndLabel: RDD[(Double, Double)]) : Double = {
val nObs = predictionAndLabel.count.toDouble
val nPos = predictionAndLabel.filter(x => x._2 > 0.5).count.toDouble
// sort according to the predicted score and add indices
val sortedPredictionsWithIndex = predictionAndLabel.sortByKey(true).zipWithIndex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final comment I promise. There's an interesting question of what to do about ties in the keys. If all of my predictions are 0/1 for example, then the ranking of 0/1 true labels within those predictions ends up being pretty arbitrary, and so does the AUC. I don't think this formula deals with that case, and you could argue it's a degenerate case or argue it's actually somewhat common. To make it at least deterministic, maybe a secondary sort by true label? I don't know whether to be optimistic or pessimistic, and sort the positives first or last. Tough one.

// sum of the positive ranks
val sumPosRanks = sortedPredictionsWithIndex.filter(x => (x._1)._2 > 0).map(x => x._2 + 1).sum
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm misreading, in this line, positive is defined as "> 0" but it's defined as "== 1" above. The former might be the safer definition.

// if there are no positive or no negative labels, the area under the curve is not defined.
// Return 0 in that case.
if ((nPos > 0) && (nObs > nPos)) {
(sumPosRanks - nPos * (nPos + 1) / 2) / (nPos * (nObs - nPos))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think int overflow will be an issue here, even though nPos and nObs are Long. For massive RDDs of more than about 2 billion elements this could fail. Better to make the values Double to be safe since they're used that way.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is true, then that's more a general problem of count instead of this implementation, isn't it? Shouldn't the implementation for count for RDD take this into account?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I mean that this expression will be incorrect well before the Long values themselves overflow:

scala> val nPos = 4000000000L
nPos: Long = 4000000000
scala> nPos * (nPos + 1) / 2
res1: Long = -1223372034854775808

(Should have said that the problem occurs a little past 3B, not 2B). Sure the 64-bit integer overflows eventually but not worried about quintillions of elements just yet! but having a few billion elements is quite possible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks! Fixed it.

} else {
0.0
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,20 @@ trait ClassificationModel extends Serializable {
* @return Int prediction from the trained model
*/
def predict(testData: Array[Double]): Double

/**
* Score values for the given data set using the model trained.
*
* @param testData RDD representing data points to be predicted
* @return RDD[Double] where each entry contains the corresponding prediction
*/
def score(testData: RDD[Array[Double]]): RDD[Double]

/**
* Score values for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return Double prediction from the trained model
*/
def score(testData: Array[Double]): Double
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.mllib.classification
import scala.math.round

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
Expand All @@ -38,12 +40,60 @@ class LogisticRegressionModel(
override val weights: Array[Double],
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept)
with ClassificationModel with Serializable {
with BinaryClassificationModel with Serializable {

/**
* Predict probabilties for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return Double prediction from the trained model
*/

override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
intercept: Double) = {
val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
round(1.0/ (1.0 + math.exp(margin * -1)))
if (predictScore(dataMatrix, weightMatrix, intercept) < 0) 0.0 else 1.0
}

/**
* Returns true label and predicted score using the model trained.
*
* @param labeledData array representing an array of labelled data
* @return Double with score of linear model. One can obtain probabilities by
* applying the logistic (or sigmoid) function
*/
def predictScore(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
intercept: Double) = {
// margin
dataMatrix.mmul(weightMatrix).get(0) + intercept
}

/**
* Score values for the given data set using the model trained.
*
* @param testData RDD representing data points to be predicted
* @return RDD[Double] where each entry contains the corresponding prediction
*/
def score(testData: RDD[Array[Double]]): RDD[Double] = {
// A small optimization to avoid serializing the entire model. Only the weightsMatrix
// and intercept is needed.
val localWeights = weightsMatrix
val localIntercept = intercept

testData.map { x =>
val dataMatrix = new DoubleMatrix(1, x.length, x:_*)
predictScore(dataMatrix, localWeights, localIntercept)
}
}

/**
* Predict values for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return Double prediction from the trained model
*/
def score(testData: Array[Double]): Double = {
val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
predictScore(dataMat, weightsMatrix, intercept)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
val result = _pi.add(_theta.mmul(dataMatrix))
result.argmax()
}

/// same as predict in this case
def score(testData: RDD[Array[Double]]): RDD[Double] = testData.map(score)

def score(testData: Array[Double]): Double = predict(testData)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,48 @@ class SVMModel(
override val weights: Array[Double],
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept)
with ClassificationModel with Serializable {
with BinaryClassificationModel with Serializable {

override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
intercept: Double) = {
val margin = dataMatrix.dot(weightMatrix) + intercept
val margin = predictScore(dataMatrix, weightMatrix, intercept)
if (margin < 0) 0.0 else 1.0
}

///
def predictScore(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
intercept: Double) = {
dataMatrix.dot(weightMatrix) + intercept
}

/**
* Score values for the given data set using the model trained.
*
* @param testData RDD representing data points to be predicted
* @return RDD[Double] where each entry contains the corresponding score
*/
def score(testData: RDD[Array[Double]]): RDD[Double] = {
// A small optimization to avoid serializing the entire model. Only the weightsMatrix
// and intercept is needed.
val localWeights = weightsMatrix
val localIntercept = intercept

testData.map { x =>
val dataMatrix = new DoubleMatrix(1, x.length, x:_*)
predictScore(dataMatrix, localWeights, localIntercept)
}
}

/**
* Score values for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return Double score from the trained model
*/
def score(testData: Array[Double]): Double = {
val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
predictScore(dataMat, weightsMatrix, intercept)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
extends Serializable {

// Create a column vector that can be used for predictions
private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
protected val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)

/**
* Predict the result given a data point and the weights learned.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.classification

import scala.math.abs
import scala.util.Random
import scala.collection.JavaConversions._

import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers

import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.LocalSparkContext

class BinaryClassificationEvaluationSuite extends FunSuite with LocalSparkContext
with ShouldMatchers {
def validateResult(estVal: Double, trueVal: Double, tol: Double) {
abs(estVal - trueVal) should be < tol
}

// Test ROC area under the curve using synthetic output of a model
test("ROC area under curve, synthetic, LR") {
val predictionAndLabelC = sc.parallelize(Array((3.0, 1.0), (-2.0, 0.0), (2.0, 1.0), (-1.0, 0.0),
(1.0, 1.0)))
val modelC = new LogisticRegressionModel(Array(0.0), 0.0)
val aucRocC = modelC.areaUnderROC(predictionAndLabelC)
validateResult(aucRocC, 1.0, 0.01)

val predictionAndLabelR = sc.parallelize(Array((0.45, 1.0), (-0.23, 0.0), (-0.34, 1.0),
(-0.42, 0.0), (0.62, 1.0)))
val modelR = new LogisticRegressionModel(Array(0.0), 0.0)
val aucRocR = modelR.areaUnderROC(predictionAndLabelR)
validateResult(aucRocR, 0.8333, 0.01)
}

// Test ROC area under the curve using a small data set and svm
test("ROC area under curve, sythentic, SVM") {
val predictionAndLabelC = sc.parallelize(Array((3.0, 1.0), (-2.0, 0.0), (2.0, 1.0), (-1.0, 0.0),
(1.0, 1.0)))
val modelC = new SVMModel(Array(0.0), 0.0)
val aucRocC = modelC.areaUnderROC(predictionAndLabelC)
validateResult(aucRocC, 1.0, 0.01)

val predictionAndLabelR = sc.parallelize(Array((0.45, 1.0), (-0.23, 0.0), (-0.34, 1.0),
(-0.42, 0.0), (0.62, 1.0)))
val modelR = new SVMModel(Array(0.0), 0.0)
val aucRocR = modelR.areaUnderROC(predictionAndLabelR)
validateResult(aucRocR, 0.8333, 0.01)
}
}