diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 4fd7904d026c6..ca2ac820fab13 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -21,6 +21,8 @@ import java.io.Serializable import java.lang.{Double => JDouble} import java.util.Arrays.binarySearch +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} import org.apache.spark.rdd.RDD @@ -31,31 +33,29 @@ import org.apache.spark.rdd.RDD * Boundaries must be sorted in increasing order. * @param predictions Array of predictions associated to the boundaries at the same index. * Results of isotonic regression and therefore monotone. + * @param isotonic indicates whether this is isotonic or antitonic. */ class IsotonicRegressionModel ( - boundaries: Array[Double], + val boundaries: Array[Double], val predictions: Array[Double], - isotonic: Boolean) - extends Serializable { + val isotonic: Boolean) extends Serializable { + + private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse + + require(boundaries.length == predictions.length) + assertOrdered(boundaries) + assertOrdered(predictions)(predictionOrd) - private def isSorted(xs: Array[Double]): Boolean = { + /** Asserts the input array is monotone with the given ordering. */ + private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = { var i = 1 while (i < xs.length) { - if (xs(i) < xs(i - 1)) false + require(ord.compare(xs(i - 1), xs(i)) <= 0, + s"Elements (${xs(i - 1)}, ${xs(i)}) are not ordered.") i += 1 } - true - } - - if (isotonic) { - assert(isSorted(predictions)) - } else { - assert(isSorted(predictions.map(-_))) } - assert(isSorted(boundaries)) - assert(boundaries.length == predictions.length) - /** * Predict labels for provided features. * Using a piecewise linear function. @@ -175,10 +175,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali input.map(x => (-x._1, x._2, x._3)) } - val isotonicRegression = parallelPoolAdjacentViolators(preprocessedInput) + val pooled = parallelPoolAdjacentViolators(preprocessedInput) - val predictions = if (isotonic) isotonicRegression.map(_._1) else isotonicRegression.map(-_._1) - val boundaries = isotonicRegression.map(_._2) + val predictions = if (isotonic) pooled.map(_._1) else pooled.map(-_._1) + val boundaries = pooled.map(_._2) new IsotonicRegressionModel(boundaries, predictions, isotonic) } @@ -210,6 +210,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali private def poolAdjacentViolators( input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { + if (input.isEmpty) { + return Array.empty + } + // Pools sub array within given bounds assigning weighted average value to all elements. def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { val poolSubArray = input.slice(start, end + 1) @@ -248,7 +252,35 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali } } - input + // For points having the same prediction, we only keep two boundary points. + val compressed = ArrayBuffer.empty[(Double, Double, Double)] + + var (curLabel, curFeature, curWeight) = input.head + var rightBound = curFeature + def merge(): Unit = { + compressed += ((curLabel, curFeature, curWeight)) + if (rightBound > curFeature) { + compressed += ((curLabel, rightBound, 0.0)) + } + } + i = 1 + while (i < input.length) { + val (label, feature, weight) = input(i) + if (label == curLabel) { + curWeight += weight + rightBound = feature + } else { + merge() + curLabel = label + curFeature = feature + curWeight = weight + rightBound = curFeature + } + i += 1 + } + merge() + + compressed.toArray } /** @@ -261,11 +293,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { - val parallelStepResult = input .sortBy(x => (x._2, x._1)) - .mapPartitions(it => poolAdjacentViolators(it.toArray).toIterator) - - poolAdjacentViolators(parallelStepResult.collect()) + .glom() + .flatMap(poolAdjacentViolators) + .collect() + .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering. + poolAdjacentViolators(parallelStepResult) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 962e52536da66..7ef45248281e9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.regression import org.scalatest.{Matchers, FunSuite} import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers { @@ -28,15 +29,13 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M } private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = { - labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d)) + Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, 1d)) } private def generateIsotonicInput( labels: Seq[Double], weights: Seq[Double]): Seq[(Double, Double, Double)] = { - labels.zip(1 to labels.size) - .zip(weights) - .map(point => (point._1._1, point._1._2.toDouble, point._2)) + Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, weights(i))) } private def runIsotonicRegression( @@ -54,9 +53,24 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M } test("increasing isotonic regression") { - val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true) + /* + The following result could be re-produced with sklearn. - assert(model.predictions === Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 16.5, 16.5, 17, 18)) + > from sklearn.isotonic import IsotonicRegression + > x = range(9) + > y = [1, 2, 3, 1, 6, 17, 16, 17, 18] + > ir = IsotonicRegression(x, y) + > print ir.predict(x) + + array([ 1. , 2. , 2. , 2. , 6. , 16.5, 16.5, 17. , 18. ]) + */ + val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true) + + assert(Array.tabulate(9)(x => model.predict(x)) === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18)) + + assert(model.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8)) + assert(model.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0)) + assert(model.isotonic) } test("isotonic regression with size 0") { @@ -80,74 +94,82 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("isotonic regression strictly decreasing sequence") { val model = runIsotonicRegression(Seq(5, 4, 3, 2, 1), true) - assert(model.predictions === Array(3, 3, 3, 3, 3)) + assert(model.boundaries === Array(0, 4)) + assert(model.predictions === Array(3, 3)) } test("isotonic regression with last element violating monotonicity") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), true) - assert(model.predictions === Array(1, 2, 3, 3, 3)) + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions === Array(1, 2, 3, 3)) } test("isotonic regression with first element violating monotonicity") { val model = runIsotonicRegression(Seq(4, 2, 3, 4, 5), true) - assert(model.predictions === Array(3, 3, 3, 4, 5)) + assert(model.boundaries === Array(0, 2, 3, 4)) + assert(model.predictions === Array(3, 3, 4, 5)) } test("isotonic regression with negative labels") { val model = runIsotonicRegression(Seq(-1, -2, 0, 1, -1), true) - assert(model.predictions === Array(-1.5, -1.5, 0, 0, 0)) + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions === Array(-1.5, -1.5, 0, 0)) } test("isotonic regression with unordered input") { - val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache() - val model = new IsotonicRegression().run(trainRDD) + val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2).cache() + val model = new IsotonicRegression().run(trainRDD) assert(model.predictions === Array(1, 2, 3, 4, 5)) } test("weighted isotonic regression") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2), true) - assert(model.predictions === Array(1, 2, 2.75, 2.75,2.75)) + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions === Array(1, 2, 2.75, 2.75)) } test("weighted isotonic regression with weights lower than 1") { val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true) - assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2)) + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2)) } test("weighted isotonic regression with negative weights") { val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true) - assert(model.predictions === Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6)) + assert(model.boundaries === Array(0.0, 1.0, 4.0)) + assert(model.predictions === Array(1.0, 10.0/6, 10.0/6)) } test("weighted isotonic regression with zero weights") { val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true) - assert(model.predictions === Array(1, 2, 2, 2, 2)) + assert(model.boundaries === Array(0.0, 1.0, 4.0)) + assert(model.predictions === Array(1, 2, 2)) } test("isotonic regression prediction") { val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) + assert(model.predict(-2) === 1) assert(model.predict(-1) === 1) - assert(model.predict(0) === 1) - assert(model.predict(1.5) === 1.5) - assert(model.predict(1.75) === 1.75) - assert(model.predict(2) === 2) - assert(model.predict(3) === 10d/3) - assert(model.predict(10) === 10d/3) + assert(model.predict(0.5) === 1.5) + assert(model.predict(0.75) === 1.75) + assert(model.predict(1) === 2) + assert(model.predict(2) === 10d/3) + assert(model.predict(9) === 10d/3) } test("isotonic regression prediction with duplicate features") { val trainRDD = sc.parallelize( Seq[(Double, Double, Double)]( - (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1))).cache() + (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2).cache() val model = new IsotonicRegression().run(trainRDD) assert(model.predict(0) === 1) @@ -159,7 +181,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("antitonic regression prediction with duplicate features") { val trainRDD = sc.parallelize( Seq[(Double, Double, Double)]( - (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1))).cache() + (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2).cache() val model = new IsotonicRegression().setIsotonic(false).run(trainRDD) assert(model.predict(0) === 6) @@ -170,20 +192,50 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("isotonic regression RDD prediction") { val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) - val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache() - assert(model.predict(testRDD).collect() === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3)) + val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2).cache() + val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2) + assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3)) } test("antitonic regression prediction") { val model = runIsotonicRegression(Seq(7, 5, 3, 5, 1), false) + assert(model.predict(-2) === 7) assert(model.predict(-1) === 7) - assert(model.predict(0) === 7) - assert(model.predict(1.5) === 6) - assert(model.predict(1.75) === 5.5) - assert(model.predict(2) === 5) - assert(model.predict(3) === 4) - assert(model.predict(10) === 1) - } -} \ No newline at end of file + assert(model.predict(0.5) === 6) + assert(model.predict(0.75) === 5.5) + assert(model.predict(1) === 5) + assert(model.predict(2) === 4) + assert(model.predict(9) === 1) + } + + test("model construction") { + val model = new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = true) + assert(model.predict(-0.5) === 1.0) + assert(model.predict(0.0) === 1.0) + assert(model.predict(0.5) ~== 1.5 absTol 1e-14) + assert(model.predict(1.0) === 2.0) + assert(model.predict(1.5) === 2.0) + + intercept[IllegalArgumentException] { + // different array sizes. + new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0), isotonic = true) + } + + intercept[IllegalArgumentException] { + // unordered boundaries + new IsotonicRegressionModel(Array(1.0, 0.0), Array(1.0, 2.0), isotonic = true) + } + + intercept[IllegalArgumentException] { + // unordered predictions (isotonic) + new IsotonicRegressionModel(Array(0.0, 1.0), Array(2.0, 1.0), isotonic = true) + } + + intercept[IllegalArgumentException] { + // unordered predictions (antitonic) + new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = false) + } + } +}