From 9ae9d53e788c8abab105976cb34215c6dc0e499d Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Fri, 23 Jan 2015 01:15:18 +0000 Subject: [PATCH] SPARK-3278 changes after PR feedback https://github.com/apache/spark/pull/3519. Binary search used for isotonic regression model predictions --- .../mllib/regression/IsotonicRegression.scala | 41 ++++++++++++++----- .../JavaIsotonicRegressionSuite.java | 2 +- .../regression/IsotonicRegressionSuite.scala | 37 ++++++++--------- 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index b2f2d0c3ce39a..1fe7af2d5058e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.regression import java.io.Serializable +import java.util.Arrays.binarySearch import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} import org.apache.spark.rdd.RDD @@ -25,16 +26,17 @@ import org.apache.spark.rdd.RDD /** * Regression model for Isotonic regression * - * @param predictions Weights computed for every feature. - * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence + * @param features Array of features. + * @param labels Array of labels associated to the features at the same index. */ class IsotonicRegressionModel ( - val predictions: Seq[(Double, Double, Double)], - val isotonic: Boolean) + features: Array[Double], + val labels: Array[Double]) extends Serializable { /** * Predict labels for provided features + * Using a piecewise constant function * * @param testData features to be labeled * @return predicted labels @@ -44,6 +46,7 @@ class IsotonicRegressionModel ( /** * Predict labels for provided features + * Using a piecewise constant function * * @param testData features to be labeled * @return predicted labels @@ -53,13 +56,25 @@ class IsotonicRegressionModel ( /** * Predict a single label + * Using a piecewise constant function * * @param testData feature to be labeled * @return predicted label */ - def predict(testData: Double): Double = - // Take the highest of data points smaller than our feature or data point with lowest feature - (predictions.head +: predictions.filter(y => y._2 <= testData)).last._1 + def predict(testData: Double): Double = { + val result = binarySearch(features, testData) + + val index = + if (result == -1) { + 0 + } else if (result < 0) { + -result - 2 + } else { + result + } + + labels(index) + } } /** @@ -93,9 +108,13 @@ class IsotonicRegression * @return isotonic regression model */ protected def createModel( - predictions: Seq[(Double, Double, Double)], + predictions: Array[(Double, Double, Double)], isotonic: Boolean): IsotonicRegressionModel = { - new IsotonicRegressionModel(predictions, isotonic) + + val labels = predictions.map(_._1) + val features = predictions.map(_._2) + + new IsotonicRegressionModel(features, labels) } /** @@ -167,7 +186,7 @@ class IsotonicRegression */ private def parallelPoolAdjacentViolators( testData: RDD[(Double, Double, Double)], - isotonic: Boolean): Seq[(Double, Double, Double)] = { + isotonic: Boolean): Array[(Double, Double, Double)] = { val parallelStepResult = testData .sortBy(_._2) @@ -213,7 +232,7 @@ object IsotonicRegression { isotonic: Boolean): IsotonicRegressionModel = { new IsotonicRegression() .run( - input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), x._3.doubleValue())), + input.rdd.asInstanceOf[RDD[(Double, Double, Double)]], isotonic) } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java index de91a0fa3b3f5..b064e1aeec203 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java @@ -49,7 +49,7 @@ public void tearDown() { double difference(List> expected, IsotonicRegressionModel model) { double diff = 0; - for(int i = 0; i < model.predictions().length(); i++) { + for(int i = 0; i < model.labels().length; i++) { Tuple3 exp = expected.get(i); diff += Math.abs(model.predict(exp._2()) - exp._1()); } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 510528a57309e..06f18fb17ec55 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -38,9 +38,8 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be( - generateIsotonicInput( - 1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) + model.labels should be( + Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) } test("increasing isotonic regression using api") { @@ -50,9 +49,8 @@ class IsotonicRegressionSuite val model = IsotonicRegression.train(trainRDD, true) - model.predictions should be( - generateIsotonicInput( - 1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) + model.labels should be( + Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) } test("isotonic regression with size 0") { @@ -61,7 +59,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(List()) + model.labels should be(Array()) } test("isotonic regression with size 1") { @@ -70,7 +68,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(generateIsotonicInput(1)) + model.labels should be(Array(1.0)) } test("isotonic regression strictly increasing sequence") { @@ -79,7 +77,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5)) + model.labels should be(Array(1, 2, 3, 4, 5)) } test("isotonic regression strictly decreasing sequence") { @@ -88,7 +86,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(generateIsotonicInput(3, 3, 3, 3, 3)) + model.labels should be(Array(3, 3, 3, 3, 3)) } test("isotonic regression with last element violating monotonicity") { @@ -97,7 +95,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(generateIsotonicInput(1, 2, 3, 3, 3)) + model.labels should be(Array(1, 2, 3, 3, 3)) } test("isotonic regression with first element violating monotonicity") { @@ -106,7 +104,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(generateIsotonicInput(3, 3, 3, 4, 5)) + model.labels should be(Array(3, 3, 3, 4, 5)) } test("isotonic regression with negative labels") { @@ -115,7 +113,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(generateIsotonicInput(-1.5, -1.5, 0, 0, 0)) + model.labels should be(Array(-1.5, -1.5, 0, 0, 0)) } test("isotonic regression with unordered input") { @@ -124,7 +122,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5)) + model.labels should be(Array(1, 2, 3, 4, 5)) } test("weighted isotonic regression") { @@ -134,8 +132,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be( - generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2))) + model.labels should be(Array(1, 2, 2.75, 2.75,2.75)) } test("weighted isotonic regression with weights lower than 1") { @@ -145,8 +142,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions.map(p => p.copy(_1 = round(p._1))) should be( - generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1))) + model.labels.map(round) should be(Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2)) } test("weighted isotonic regression with negative weights") { @@ -155,8 +151,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be( - generateWeightedIsotonicInput(Seq(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6), Seq(-1, 1, -3, 1, -5))) + model.labels should be(Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6)) } test("weighted isotonic regression with zero weights") { @@ -165,7 +160,7 @@ class IsotonicRegressionSuite val alg = new IsotonicRegression val model = alg.run(trainRDD, true) - model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2, 2, 2), Seq(0, 0, 0, 1, 0))) + model.labels should be(Array(1, 2, 2, 2, 2)) } test("isotonic regression prediction") {