From 80c6681c6c649b6f05473c4e787756fb7c89e682 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 30 Jan 2015 11:43:16 -0800 Subject: [PATCH 1/6] update IRModel --- .../mllib/regression/IsotonicRegression.scala | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 4fd7904d026c6..779fa8d4c1453 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -31,31 +31,29 @@ import org.apache.spark.rdd.RDD * Boundaries must be sorted in increasing order. * @param predictions Array of predictions associated to the boundaries at the same index. * Results of isotonic regression and therefore monotone. + * @param isotonic indicates whether this is isotonic or antitonic. */ class IsotonicRegressionModel ( - boundaries: Array[Double], + val boundaries: Array[Double], val predictions: Array[Double], - isotonic: Boolean) - extends Serializable { + val isotonic: Boolean) extends Serializable { - private def isSorted(xs: Array[Double]): Boolean = { + private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse + + assert(boundaries.length == predictions.length) + assertOrdered(boundaries) + assertOrdered(predictions)(predictionOrd) + + /** Asserts the input array is monotone with the given ordering. */ + private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = { var i = 1 while (i < xs.length) { - if (xs(i) < xs(i - 1)) false + assert(ord.compare(xs(i - 1), xs(i)) <= 0, + s"Elements (${xs(i - 1)}, ${xs(i)}) are not ordered.") i += 1 } - true } - if (isotonic) { - assert(isSorted(predictions)) - } else { - assert(isSorted(predictions.map(-_))) - } - - assert(isSorted(boundaries)) - assert(boundaries.length == predictions.length) - /** * Predict labels for provided features. * Using a piecewise linear function. From 05422a8fe9e5911d8e496c2dcc80a819118c893e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 30 Jan 2015 11:51:06 -0800 Subject: [PATCH 2/6] add unit test for model construction --- .../mllib/regression/IsotonicRegression.scala | 4 +- .../regression/IsotonicRegressionSuite.scala | 45 +++++++++++++------ 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 779fa8d4c1453..d63732366f940 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -40,7 +40,7 @@ class IsotonicRegressionModel ( private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse - assert(boundaries.length == predictions.length) + require(boundaries.length == predictions.length) assertOrdered(boundaries) assertOrdered(predictions)(predictionOrd) @@ -48,7 +48,7 @@ class IsotonicRegressionModel ( private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = { var i = 1 while (i < xs.length) { - assert(ord.compare(xs(i - 1), xs(i)) <= 0, + require(ord.compare(xs(i - 1), xs(i)) <= 0, s"Elements (${xs(i - 1)}, ${xs(i)}) are not ordered.") i += 1 } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 962e52536da66..0e8025c1e0e2e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.regression import org.scalatest.{Matchers, FunSuite} import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers { @@ -55,80 +56,67 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("increasing isotonic regression") { val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true) - assert(model.predictions === Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 16.5, 16.5, 17, 18)) } test("isotonic regression with size 0") { val model = runIsotonicRegression(Seq(), true) - assert(model.predictions === Array()) } test("isotonic regression with size 1") { val model = runIsotonicRegression(Seq(1), true) - assert(model.predictions === Array(1.0)) } test("isotonic regression strictly increasing sequence") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 5), true) - assert(model.predictions === Array(1, 2, 3, 4, 5)) } test("isotonic regression strictly decreasing sequence") { val model = runIsotonicRegression(Seq(5, 4, 3, 2, 1), true) - assert(model.predictions === Array(3, 3, 3, 3, 3)) } test("isotonic regression with last element violating monotonicity") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), true) - assert(model.predictions === Array(1, 2, 3, 3, 3)) } test("isotonic regression with first element violating monotonicity") { val model = runIsotonicRegression(Seq(4, 2, 3, 4, 5), true) - assert(model.predictions === Array(3, 3, 3, 4, 5)) } test("isotonic regression with negative labels") { val model = runIsotonicRegression(Seq(-1, -2, 0, 1, -1), true) - assert(model.predictions === Array(-1.5, -1.5, 0, 0, 0)) } test("isotonic regression with unordered input") { val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache() val model = new IsotonicRegression().run(trainRDD) - assert(model.predictions === Array(1, 2, 3, 4, 5)) } test("weighted isotonic regression") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2), true) - assert(model.predictions === Array(1, 2, 2.75, 2.75,2.75)) } test("weighted isotonic regression with weights lower than 1") { val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true) - assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2)) } test("weighted isotonic regression with negative weights") { val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true) - assert(model.predictions === Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6)) } test("weighted isotonic regression with zero weights") { val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true) - assert(model.predictions === Array(1, 2, 2, 2, 2)) } @@ -186,4 +174,33 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M assert(model.predict(3) === 4) assert(model.predict(10) === 1) } -} \ No newline at end of file + + test("model construction") { + val model = new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = true) + assert(model.predict(-0.5) === 1.0) + assert(model.predict(0.0) === 1.0) + assert(model.predict(0.5) ~== 1.5 absTol 1e-14) + assert(model.predict(1.0) === 2.0) + assert(model.predict(1.5) === 2.0) + + intercept[IllegalArgumentException] { + // different array sizes. + new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0), isotonic = true) + } + + intercept[IllegalArgumentException] { + // unordered boundaries + new IsotonicRegressionModel(Array(1.0, 0.0), Array(1.0, 2.0), isotonic = true) + } + + intercept[IllegalArgumentException] { + // unordered predictions (isotonic) + new IsotonicRegressionModel(Array(0.0, 1.0), Array(2.0, 1.0), isotonic = true) + } + + intercept[IllegalArgumentException] { + // unordered predictions (antitonic) + new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = false) + } + } +} From 077606ba5f5c0441f5e9c837bdbfb01cfd176726 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 30 Jan 2015 12:00:07 -0800 Subject: [PATCH 3/6] minor --- .../apache/spark/mllib/regression/IsotonicRegression.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index d63732366f940..72c1dcdbe5ebb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -173,10 +173,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali input.map(x => (-x._1, x._2, x._3)) } - val isotonicRegression = parallelPoolAdjacentViolators(preprocessedInput) + val pooled = parallelPoolAdjacentViolators(preprocessedInput) - val predictions = if (isotonic) isotonicRegression.map(_._1) else isotonicRegression.map(-_._1) - val boundaries = isotonicRegression.map(_._2) + val predictions = if (isotonic) pooled.map(_._1) else pooled.map(-_._1) + val boundaries = pooled.map(_._2) new IsotonicRegressionModel(boundaries, predictions, isotonic) } From 35d044e2996fe136a4a2ca53e53503d79d23144a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 30 Jan 2015 12:06:49 -0800 Subject: [PATCH 4/6] update paraPAVA --- .../spark/mllib/regression/IsotonicRegression.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 72c1dcdbe5ebb..1a7965a7c04b4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -210,7 +210,7 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali // Pools sub array within given bounds assigning weighted average value to all elements. def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { - val poolSubArray = input.slice(start, end + 1) + val poolSubArray = input.view.slice(start, end + 1) val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum val weight = poolSubArray.map(_._3).sum @@ -259,11 +259,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { - val parallelStepResult = input .sortBy(x => (x._2, x._1)) - .mapPartitions(it => poolAdjacentViolators(it.toArray).toIterator) - - poolAdjacentViolators(parallelStepResult.collect()) + .glom() + .flatMap(poolAdjacentViolators) + .collect() + .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering. + poolAdjacentViolators(parallelStepResult) } } From 0b35c15e6a38edbf51ea24c6a4568750a7f76b64 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 30 Jan 2015 13:19:21 -0800 Subject: [PATCH 5/6] compress pools and update tests --- .../mllib/regression/IsotonicRegression.scala | 38 +++++++- .../regression/IsotonicRegressionSuite.scala | 97 +++++++++++++------ 2 files changed, 102 insertions(+), 33 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 1a7965a7c04b4..ca2ac820fab13 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -21,6 +21,8 @@ import java.io.Serializable import java.lang.{Double => JDouble} import java.util.Arrays.binarySearch +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} import org.apache.spark.rdd.RDD @@ -208,9 +210,13 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali private def poolAdjacentViolators( input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { + if (input.isEmpty) { + return Array.empty + } + // Pools sub array within given bounds assigning weighted average value to all elements. def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { - val poolSubArray = input.view.slice(start, end + 1) + val poolSubArray = input.slice(start, end + 1) val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum val weight = poolSubArray.map(_._3).sum @@ -246,7 +252,35 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali } } - input + // For points having the same prediction, we only keep two boundary points. + val compressed = ArrayBuffer.empty[(Double, Double, Double)] + + var (curLabel, curFeature, curWeight) = input.head + var rightBound = curFeature + def merge(): Unit = { + compressed += ((curLabel, curFeature, curWeight)) + if (rightBound > curFeature) { + compressed += ((curLabel, rightBound, 0.0)) + } + } + i = 1 + while (i < input.length) { + val (label, feature, weight) = input(i) + if (label == curLabel) { + curWeight += weight + rightBound = feature + } else { + merge() + curLabel = label + curFeature = feature + curWeight = weight + rightBound = curFeature + } + i += 1 + } + merge() + + compressed.toArray } /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 0e8025c1e0e2e..9a516402267a9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -29,15 +29,13 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M } private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = { - labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d)) + Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, 1d)) } private def generateIsotonicInput( labels: Seq[Double], weights: Seq[Double]): Seq[(Double, Double, Double)] = { - labels.zip(1 to labels.size) - .zip(weights) - .map(point => (point._1._1, point._1._2.toDouble, point._2)) + Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, weights(i))) } private def runIsotonicRegression( @@ -55,87 +53,123 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M } test("increasing isotonic regression") { - val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true) - assert(model.predictions === Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 16.5, 16.5, 17, 18)) + /* + The following result could be re-produced with sklearn. + + > from sklearn.isotonic import IsotonicRegression + > x = range(9) + > y = [1, 2, 3, 1, 6, 17, 16, 17, 18] + > ir = IsotonicRegression(x, y) + > print ir.predict(x) + + array([ 1. , 2. , 2. , 2. , 6. , 16.5, 16.5, 17. , 18. ]) + */ + val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true) + + assert(Array.tabulate(9)(x => model.predict(x)) === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18)) + + assert(model.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8)) + assert(model.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0)) + assert(model.isotonic) } test("isotonic regression with size 0") { val model = runIsotonicRegression(Seq(), true) + assert(model.predictions === Array()) } test("isotonic regression with size 1") { val model = runIsotonicRegression(Seq(1), true) + assert(model.predictions === Array(1.0)) } test("isotonic regression strictly increasing sequence") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 5), true) + assert(model.predictions === Array(1, 2, 3, 4, 5)) } test("isotonic regression strictly decreasing sequence") { val model = runIsotonicRegression(Seq(5, 4, 3, 2, 1), true) - assert(model.predictions === Array(3, 3, 3, 3, 3)) + + assert(model.boundaries === Array(0, 4)) + assert(model.predictions === Array(3, 3)) } test("isotonic regression with last element violating monotonicity") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), true) - assert(model.predictions === Array(1, 2, 3, 3, 3)) + + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions === Array(1, 2, 3, 3)) } test("isotonic regression with first element violating monotonicity") { val model = runIsotonicRegression(Seq(4, 2, 3, 4, 5), true) - assert(model.predictions === Array(3, 3, 3, 4, 5)) + + assert(model.boundaries === Array(0, 2, 3, 4)) + assert(model.predictions === Array(3, 3, 4, 5)) } test("isotonic regression with negative labels") { val model = runIsotonicRegression(Seq(-1, -2, 0, 1, -1), true) - assert(model.predictions === Array(-1.5, -1.5, 0, 0, 0)) + + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions === Array(-1.5, -1.5, 0, 0)) } test("isotonic regression with unordered input") { - val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2) + val model = new IsotonicRegression().run(trainRDD) assert(model.predictions === Array(1, 2, 3, 4, 5)) } test("weighted isotonic regression") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2), true) - assert(model.predictions === Array(1, 2, 2.75, 2.75,2.75)) + + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions === Array(1, 2, 2.75, 2.75)) } test("weighted isotonic regression with weights lower than 1") { val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true) - assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2)) + + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2)) } test("weighted isotonic regression with negative weights") { val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true) - assert(model.predictions === Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6)) + + assert(model.boundaries === Array(0.0, 1.0, 4.0)) + assert(model.predictions === Array(1.0, 10.0/6, 10.0/6)) } test("weighted isotonic regression with zero weights") { val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true) - assert(model.predictions === Array(1, 2, 2, 2, 2)) + + assert(model.boundaries === Array(0.0, 1.0, 4.0)) + assert(model.predictions === Array(1, 2, 2)) } test("isotonic regression prediction") { val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) + assert(model.predict(-2) === 1) assert(model.predict(-1) === 1) - assert(model.predict(0) === 1) - assert(model.predict(1.5) === 1.5) - assert(model.predict(1.75) === 1.75) - assert(model.predict(2) === 2) - assert(model.predict(3) === 10d/3) - assert(model.predict(10) === 10d/3) + assert(model.predict(0.5) === 1.5) + assert(model.predict(0.75) === 1.75) + assert(model.predict(1) === 2) + assert(model.predict(2) === 10d/3) + assert(model.predict(9) === 10d/3) } test("isotonic regression prediction with duplicate features") { val trainRDD = sc.parallelize( Seq[(Double, Double, Double)]( - (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1))).cache() + (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2) val model = new IsotonicRegression().run(trainRDD) assert(model.predict(0) === 1) @@ -147,7 +181,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("antitonic regression prediction with duplicate features") { val trainRDD = sc.parallelize( Seq[(Double, Double, Double)]( - (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1))).cache() + (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2) val model = new IsotonicRegression().setIsotonic(false).run(trainRDD) assert(model.predict(0) === 6) @@ -158,21 +192,22 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("isotonic regression RDD prediction") { val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) - val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache() - assert(model.predict(testRDD).collect() === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3)) + val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2) + val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2) + assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3)) } test("antitonic regression prediction") { val model = runIsotonicRegression(Seq(7, 5, 3, 5, 1), false) + assert(model.predict(-2) === 7) assert(model.predict(-1) === 7) - assert(model.predict(0) === 7) - assert(model.predict(1.5) === 6) - assert(model.predict(1.75) === 5.5) - assert(model.predict(2) === 5) - assert(model.predict(3) === 4) - assert(model.predict(10) === 1) + assert(model.predict(0.5) === 6) + assert(model.predict(0.75) === 5.5) + assert(model.predict(1) === 5) + assert(model.predict(2) === 4) + assert(model.predict(9) === 1) } test("model construction") { From 4dfe136973693d94ec4d85ef4264d79af2aea647 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 30 Jan 2015 13:25:30 -0800 Subject: [PATCH 6/6] add cache back --- .../spark/mllib/regression/IsotonicRegressionSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 9a516402267a9..7ef45248281e9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -120,7 +120,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M } test("isotonic regression with unordered input") { - val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2) + val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2).cache() val model = new IsotonicRegression().run(trainRDD) assert(model.predictions === Array(1, 2, 3, 4, 5)) @@ -169,7 +169,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("isotonic regression prediction with duplicate features") { val trainRDD = sc.parallelize( Seq[(Double, Double, Double)]( - (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2) + (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2).cache() val model = new IsotonicRegression().run(trainRDD) assert(model.predict(0) === 1) @@ -181,7 +181,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("antitonic regression prediction with duplicate features") { val trainRDD = sc.parallelize( Seq[(Double, Double, Double)]( - (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2) + (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2).cache() val model = new IsotonicRegression().setIsotonic(false).run(trainRDD) assert(model.predict(0) === 6) @@ -193,7 +193,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("isotonic regression RDD prediction") { val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) - val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2) + val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2).cache() val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2) assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3)) }