diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index ba33762fba754..dda666c4c2c1b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -17,67 +17,47 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint._ +import org.apache.spark.api.java.{JavaRDD, JavaPairRDD} import org.apache.spark.rdd.RDD -/** - * Monotonicity constrains for monotone regression - * Isotonic (increasing) - * Antitonic (decreasing) - */ -object MonotonicityConstraint { - - object MonotonicityConstraint { - - sealed trait MonotonicityConstraint { - private[regression] def holds( - current: WeightedLabeledPoint, - next: WeightedLabeledPoint): Boolean - } - - /** - * Isotonic monotonicity constraint. Increasing sequence - */ - case object Isotonic extends MonotonicityConstraint { - override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = { - current.label <= next.label - } - } - - /** - * Antitonic monotonicity constrain. Decreasing sequence - */ - case object Antitonic extends MonotonicityConstraint { - override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = { - current.label >= next.label - } - } - } - - val Isotonic = MonotonicityConstraint.Isotonic - val Antitonic = MonotonicityConstraint.Antitonic -} - /** * Regression model for Isotonic regression * * @param predictions Weights computed for every feature. - * @param monotonicityConstraint specifies if the sequence is increasing or decreasing + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence */ -class IsotonicRegressionModel( - val predictions: Seq[WeightedLabeledPoint], - val monotonicityConstraint: MonotonicityConstraint) - extends RegressionModel { +class IsotonicRegressionModel ( + val predictions: Seq[(Double, Double, Double)], + val isotonic: Boolean) + extends Serializable { - override def predict(testData: RDD[Vector]): RDD[Double] = + /** + * Predict labels for provided features + * + * @param testData features to be labeled + * @return predicted labels + */ + def predict(testData: RDD[Double]): RDD[Double] = testData.map(predict) - override def predict(testData: Vector): Double = { + /** + * Predict labels for provided features + * + * @param testData features to be labeled + * @return predicted labels + */ + def predict(testData: JavaRDD[java.lang.Double]): RDD[java.lang.Double] = + testData.rdd.map(x => x.doubleValue()).map(predict) + + /** + * Predict a single label + * + * @param testData feature to be labeled + * @return predicted label + */ + def predict(testData: Double): Double = // Take the highest of data points smaller than our feature or data point with lowest feature - (predictions.head +: - predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label - } + (predictions.head +: predictions.filter(y => y._2 <= testData)).last._1 } /** @@ -91,23 +71,23 @@ trait IsotonicRegressionAlgorithm * * @param predictions labels estimated using isotonic regression algorithm. * Used for predictions on new data points. - * @param monotonicityConstraint isotonic or antitonic + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence * @return isotonic regression model */ protected def createModel( - predictions: Seq[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel + predictions: Seq[(Double, Double, Double)], + isotonic: Boolean): IsotonicRegressionModel /** * Run algorithm to obtain isotonic regression model * - * @param input data - * @param monotonicityConstraint ascending or descenting + * @param input (label, feature, weight) + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence * @return isotonic regression model */ def run( - input: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel + input: RDD[(Double, Double, Double)], + isotonic: Boolean): IsotonicRegressionModel } /** @@ -117,17 +97,17 @@ class PoolAdjacentViolators private [mllib] extends IsotonicRegressionAlgorithm { override def run( - input: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { + input: RDD[(Double, Double, Double)], + isotonic: Boolean): IsotonicRegressionModel = { createModel( - parallelPoolAdjacentViolators(input, monotonicityConstraint), - monotonicityConstraint) + parallelPoolAdjacentViolators(input, isotonic), + isotonic) } override protected def createModel( - predictions: Seq[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { - new IsotonicRegressionModel(predictions, monotonicityConstraint) + predictions: Seq[(Double, Double, Double)], + isotonic: Boolean): IsotonicRegressionModel = { + new IsotonicRegressionModel(predictions, isotonic) } /** @@ -138,32 +118,40 @@ class PoolAdjacentViolators private [mllib] * Method in situ mutates input array * * @param in input data - * @param monotonicityConstraint asc or desc + * @param isotonic asc or desc * @return result */ private def poolAdjacentViolators( - in: Array[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): Array[WeightedLabeledPoint] = { + in: Array[(Double, Double, Double)], + isotonic: Boolean): Array[(Double, Double, Double)] = { // Pools sub array within given bounds assigning weighted average value to all elements - def pool(in: Array[WeightedLabeledPoint], start: Int, end: Int): Unit = { + def pool(in: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { val poolSubArray = in.slice(start, end + 1) - val weightedSum = poolSubArray.map(lp => lp.label * lp.weight).sum - val weight = poolSubArray.map(_.weight).sum + val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum + val weight = poolSubArray.map(_._3).sum for(i <- start to end) { - in(i) = WeightedLabeledPoint(weightedSum / weight, in(i).features, in(i).weight) + in(i) = (weightedSum / weight, in(i)._2, in(i)._3) } } + val isotonicConstraint: (Double, Double) => Boolean = (x, y) => x <= y + val antitonicConstraint: (Double, Double) => Boolean = (x, y) => x >= y + + def monotonicityConstraint(isotonic: Boolean) = + if(isotonic) isotonicConstraint else antitonicConstraint + + val monotonicityConstraintHolds = monotonicityConstraint(isotonic) + var i = 0 while(i < in.length) { var j = i // Find monotonicity violating sequence, if any - while(j < in.length - 1 && !monotonicityConstraint.holds(in(j), in(j + 1))) { + while(j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) { j = j + 1 } @@ -173,7 +161,7 @@ class PoolAdjacentViolators private [mllib] } else { // Otherwise pool the violating sequence // And check if pooling caused monotonicity violation in previously processed points - while (i >= 0 && !monotonicityConstraint.holds(in(i), in(i + 1))) { + while (i >= 0 && !monotonicityConstraintHolds(in(i)._1, in(i + 1)._1)) { pool(in, i, j) i = i - 1 } @@ -190,19 +178,19 @@ class PoolAdjacentViolators private [mllib] * Calls Pool adjacent violators on each partition and then again on the result * * @param testData input - * @param monotonicityConstraint asc or desc + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence * @return result */ private def parallelPoolAdjacentViolators( - testData: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): Seq[WeightedLabeledPoint] = { + testData: RDD[(Double, Double, Double)], + isotonic: Boolean): Seq[(Double, Double, Double)] = { poolAdjacentViolators( testData - .sortBy(_.features.toArray.head) + .sortBy(_._2) .cache() - .mapPartitions(it => poolAdjacentViolators(it.toArray, monotonicityConstraint).toIterator) - .collect(), monotonicityConstraint) + .mapPartitions(it => poolAdjacentViolators(it.toArray, isotonic).toIterator) + .collect(), isotonic) } } @@ -212,20 +200,35 @@ class PoolAdjacentViolators private [mllib] object IsotonicRegression { /** - * Train a monotone regression model given an RDD of (label, features, weight). - * Currently only one dimensional algorithm is supported (features.length is one) + * Train a monotone regression model given an RDD of (label, feature, weight). * Label is the dependent y value * Weight of the data point is the number of measurements. Default is 1 * - * @param input RDD of (label, array of features, weight). + * @param input RDD of (label, feature, weight). * Each point describes a row of the data * matrix A as well as the corresponding right hand side label y * and weight as number of measurements - * @param monotonicityConstraint Isotonic (increasing) or Antitonic (decreasing) sequence + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence + */ + def train( + input: RDD[(Double, Double, Double)], + isotonic: Boolean = true): IsotonicRegressionModel = { + new PoolAdjacentViolators().run(input, isotonic) + } + + /** + * Train a monotone regression model given an RDD of (label, feature). + * Label is the dependent y value + * Weight defaults to 1 + * + * @param input RDD of (label, feature). + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence + * @return */ def train( - input: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint = Isotonic): IsotonicRegressionModel = { - new PoolAdjacentViolators().run(input, monotonicityConstraint) + input: JavaPairRDD[java.lang.Double, java.lang.Double], + isotonic: Boolean): IsotonicRegressionModel = { + new PoolAdjacentViolators() + .run(input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), 1d)), isotonic) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala deleted file mode 100644 index 1bb78c65aa647..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.regression - -import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.rdd.RDD - -import scala.beans.BeanInfo - -object WeightedLabeledPointConversions { - implicit def labeledPointToWeightedLabeledPoint( - labeledPoint: LabeledPoint): WeightedLabeledPoint = { - WeightedLabeledPoint(labeledPoint.label, labeledPoint.features) - } - - implicit def labeledPointRDDToWeightedLabeledPointRDD( - rdd: RDD[LabeledPoint]): RDD[WeightedLabeledPoint] = { - rdd.map(lp => WeightedLabeledPoint(lp.label, lp.features)) - } -} - -/** - * Class that represents the features and labels of a data point with associated weight - * - * @param label Label for this data point. - * @param features List of features for this data point. - * @param weight Weight of the data point. Defaults to 1. - */ -@BeanInfo -case class WeightedLabeledPoint(label: Double, features: Vector, weight: Double = 1) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala index 8a8112eee3db7..e95e511f3199b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala @@ -17,11 +17,8 @@ package org.apache.spark.mllib.util -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.WeightedLabeledPointConversions._ -import org.apache.spark.mllib.regression.{LabeledPoint, WeightedLabeledPoint} - import scala.collection.JavaConversions._ +import java.lang.{Double => JDouble} object IsotonicDataGenerator { @@ -30,8 +27,9 @@ object IsotonicDataGenerator { * @param labels list of labels for the data points * @return Java List of input. */ - def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[WeightedLabeledPoint] = { - seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*)) + def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[(JDouble, JDouble)] = { + seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*).map(x => (new JDouble(x._1), new JDouble(x._2)))) + //.map(d => new Tuple3(new java.lang.Double(d._1), new java.lang.Double(d._2), new java.lang.Double(d._3)))) } /** @@ -39,10 +37,9 @@ object IsotonicDataGenerator { * @param labels list of labels for the data points * @return sequence of data points */ - def generateIsotonicInput(labels: Double*): Seq[WeightedLabeledPoint] = { + def generateIsotonicInput(labels: Double*): Seq[(Double, Double, Double)] = { labels.zip(1 to labels.size) - .map(point => labeledPointToWeightedLabeledPoint( - LabeledPoint(point._1, Vectors.dense(point._2)))) + .map(point => (point._1, point._2.toDouble, 1d)) } /** @@ -53,8 +50,8 @@ object IsotonicDataGenerator { */ def generateWeightedIsotonicInput( labels: Seq[Double], - weights: Seq[Double]): Seq[WeightedLabeledPoint] = { + weights: Seq[Double]): Seq[(Double, Double, Double)] = { labels.zip(1 to labels.size).zip(weights) - .map(point => WeightedLabeledPoint(point._1._1, Vectors.dense(point._1._2), point._2)) + .map(point => (point._1._1, point._1._2.toDouble, point._2)) } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java index b3285271fcbd1..ee072ef601e1e 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java @@ -17,15 +17,16 @@ package org.apache.spark.mllib.regression; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.util.IsotonicDataGenerator; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import scala.Tuple2; import java.io.Serializable; import java.util.List; @@ -44,42 +45,26 @@ public void tearDown() { sc = null; } - double difference(List expected, IsotonicRegressionModel model) { + double difference(List> expected, IsotonicRegressionModel model) { double diff = 0; for(int i = 0; i < model.predictions().length(); i++) { - WeightedLabeledPoint exp = expected.get(i); - diff += Math.abs(model.predict(exp.features()) - exp.label()); + Tuple2 exp = expected.get(i); + diff += Math.abs(model.predict(exp._2()) - exp._1()); } return diff; } - @Test - public void runIsotonicRegressionUsingConstructor() { - JavaRDD testRDD = sc.parallelize(IsotonicDataGenerator - .generateIsotonicInputAsList( - new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); - - IsotonicRegressionAlgorithm isotonicRegressionAlgorithm = new PoolAdjacentViolators(); - IsotonicRegressionModel model = isotonicRegressionAlgorithm.run(testRDD.rdd(), MonotonicityConstraint.Isotonic()); - - List expected = IsotonicDataGenerator - .generateIsotonicInputAsList( - new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12}); - - Assert.assertTrue(difference(expected, model) == 0); - } - @Test public void runIsotonicRegressionUsingStaticMethod() { - JavaRDD testRDD = sc.parallelize(IsotonicDataGenerator - .generateIsotonicInputAsList( - new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); + JavaPairRDD trainRDD = sc.parallelizePairs( + IsotonicDataGenerator.generateIsotonicInputAsList( + new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); - IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), MonotonicityConstraint.Isotonic()); + IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true); - List expected = IsotonicDataGenerator + List> expected = IsotonicDataGenerator .generateIsotonicInputAsList( new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12}); @@ -88,22 +73,23 @@ public void runIsotonicRegressionUsingStaticMethod() { @Test public void testPredictJavaRDD() { - JavaRDD testRDD = sc.parallelize(IsotonicDataGenerator - .generateIsotonicInputAsList( - new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); + JavaPairRDD trainRDD = sc.parallelizePairs( + IsotonicDataGenerator.generateIsotonicInputAsList( + new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); - IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), MonotonicityConstraint.Isotonic()); + IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true); - JavaRDD vectors = testRDD.map(new Function() { + JavaRDD testRDD = trainRDD.map(new Function, Double>() { @Override - public Vector call(WeightedLabeledPoint v) throws Exception { - return v.features(); + public Double call(Tuple2 v) throws Exception { + return v._2(); } }); - List predictions = model.predict(vectors).collect(); + Double[] predictions = model.predict(testRDD).collect(); - Assert.assertTrue(predictions.get(0) == 1d); - Assert.assertTrue(predictions.get(11) == 12d); + Assert.assertTrue(predictions[0] == 1d); + Assert.assertTrue(predictions[11] == 12d); } } + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index c5ba42e912e28..389c9add10e15 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -18,10 +18,8 @@ package org.apache.spark.mllib.regression import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint.{Antitonic, Isotonic} import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} import org.scalatest.{Matchers, FunSuite} -import WeightedLabeledPointConversions._ import scala.util.Random import org.apache.spark.mllib.util.IsotonicDataGenerator._ @@ -34,174 +32,181 @@ class IsotonicRegressionSuite Math.round(d * 100).toDouble / 100 test("increasing isotonic regression") { - val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) + + model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) + } + + test("increasing isotonic regression using api") { + val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() + + val model = IsotonicRegression.train(trainRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) } test("isotonic regression with size 0") { - val testRDD = sc.parallelize(List[WeightedLabeledPoint]()).cache() + val trainRDD = sc.parallelize(List[(Double, Double, Double)]()).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) model.predictions should be(List()) } test("isotonic regression with size 1") { - val testRDD = sc.parallelize(generateIsotonicInput(1)).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(1)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(1)) } test("isotonic regression strictly increasing sequence") { - val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5)).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5)) } test("isotonic regression strictly decreasing sequence") { - val testRDD = sc.parallelize(generateIsotonicInput(5, 4, 3, 2, 1)).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(5, 4, 3, 2, 1)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(3, 3, 3, 3, 3)) } test("isotonic regression with last element violating monotonicity") { - val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 2)).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 2)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 3, 3, 3)) } test("isotonic regression with first element violating monotonicity") { - val testRDD = sc.parallelize(generateIsotonicInput(4, 2, 3, 4, 5)).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(4, 2, 3, 4, 5)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(3, 3, 3, 4, 5)) } test("isotonic regression with negative labels") { - val testRDD = sc.parallelize(generateIsotonicInput(-1, -2, 0, 1, -1)).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(-1, -2, 0, 1, -1)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(-1.5, -1.5, 0, 0, 0)) } test("isotonic regression with unordered input") { - val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5).reverse).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5).reverse).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5)) } test("weighted isotonic regression") { - val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache() + val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2))) } test("weighted isotonic regression with weights lower than 1") { - val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache() + val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) - model.predictions.map(p => p.copy(label = round(p.label))) should be + model.predictions.map(p => p.copy(_1 = round(p._1))) should be (generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1))) } test("weighted isotonic regression with negative weights") { - val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5))).cache() + val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5))).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) - model.predictions.map(p => p.copy(label = round(p.label))) should be + model.predictions.map(p => p.copy(_1 = round(p._1))) should be (generateWeightedIsotonicInput(Seq(1, 10/6, 10/6, 10/6, 10/6), Seq(-1, 1, -3, 1, -5))) } test("weighted isotonic regression with zero weights") { - val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(0, 0, 0, 1, 0))).cache() + val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(0, 0, 0, 1, 0))).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2, 2, 2), Seq(0, 0, 0, 1, 0))) } test("isotonic regression prediction") { - val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, true) - model.predict(Vectors.dense(0)) should be(1) - model.predict(Vectors.dense(2)) should be(2) - model.predict(Vectors.dense(3)) should be(10d/3) - model.predict(Vectors.dense(10)) should be(10d/3) + model.predict(0) should be(1) + model.predict(2) should be(2) + model.predict(3) should be(10d/3) + model.predict(10) should be(10d/3) } - test("antitonic regression prediction") { - val testRDD = sc.parallelize(generateIsotonicInput(7, 5, 3, 5, 1)).cache() + test("isotonic regression RDD prediction") { + val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache() + val testRDD = sc.parallelize(List(0d, 2d, 3d, 10d)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Antitonic) + val model = alg.run(trainRDD, true) - model.predict(Vectors.dense(0)) should be(7) - model.predict(Vectors.dense(2)) should be(5) - model.predict(Vectors.dense(3)) should be(4) - model.predict(Vectors.dense(10)) should be(1) + model.predict(testRDD).collect() should be(Array(1, 2, 10d/3, 10d/3)) } - test("isotonic regression labeled point to weighted labeled point conversion") { - val testRDD = sc.parallelize( - List( - LabeledPoint(2, Vectors.dense(1)), - LabeledPoint(1, Vectors.dense(2)))).cache() + test("antitonic regression prediction") { + val trainRDD = sc.parallelize(generateIsotonicInput(7, 5, 3, 5, 1)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(trainRDD, false) - model.predictions should be(generateIsotonicInput(1.5, 1.5)) + model.predict(0) should be(7) + model.predict(2) should be(5) + model.predict(3) should be(4) + model.predict(10) should be(1) } } -class IsotonicRegressionClusterSuite extends FunSuite with LocalClusterSparkContext { +class IsotonicRegressionClusterSuite + extends FunSuite + with LocalClusterSparkContext { + //TODO: FIX test("task size should be small in both training and prediction") { - val m = 4 - val n = 200000 - val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) => - val random = new Random(idx) - iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble())))) - }.cache() + val n = 135000 + + val trainData = (0 to n).map(i => (i.toDouble, i.toDouble, 1d)) + val points = sc.parallelize(trainData, 1) // If we serialize data directly in the task closure, the size of the serialized task would be // greater than 1MB and hence Spark would throw an error. - val model = IsotonicRegression.train(points, Isotonic) - val predictions = model.predict(points.map(_.features)) + val model = IsotonicRegression.train(points, true) + val predictions = model.predict(points.map(_._2)) } } \ No newline at end of file