From 629a1ce1c5ca6915c7df81fbdd3064b4469ad070 Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Sat, 29 Nov 2014 17:38:22 -0500 Subject: [PATCH] SPARK-3278 added isotonic regression for weighted data. Added tests for Java api --- .../mllib/regression/IsotonicRegression.scala | 99 +++++++++------- .../regression/WeightedLabeledPoint.scala | 41 +++++++ .../mllib/util/IsotonicDataGenerator.scala | 57 +++++++++ .../JavaIsotonicRegressionSuite.java | 110 ++++++++++++++++++ .../regression/IsotonicRegressionSuite.scala | 101 ++++++++++++---- 5 files changed, 345 insertions(+), 63 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala create mode 100644 mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index cfe792c651b9c..7a1a84aa2eef6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -17,22 +17,33 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.regression.MonotonicityConstraint.Enum.MonotonicityConstraint import org.apache.spark.rdd.RDD -sealed trait MonotonicityConstraint { - def holds(current: LabeledPoint, next: LabeledPoint): Boolean -} +object MonotonicityConstraint { -case object Isotonic extends MonotonicityConstraint { - override def holds(current: LabeledPoint, next: LabeledPoint): Boolean = { - current.label <= next.label - } -} -case object Antitonic extends MonotonicityConstraint { - override def holds(current: LabeledPoint, next: LabeledPoint): Boolean = { - current.label >= next.label + object Enum { + + sealed trait MonotonicityConstraint { + private[regression] def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean + } + + case object Isotonic extends MonotonicityConstraint { + override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = { + current.label <= next.label + } + } + + case object Antitonic extends MonotonicityConstraint { + override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = { + current.label >= next.label + } + } } + + val Isotonic = Enum.Isotonic + val Antitonic = Enum.Antitonic } /** @@ -41,9 +52,10 @@ case object Antitonic extends MonotonicityConstraint { * @param predictions Weights computed for every feature. */ class IsotonicRegressionModel( - val predictions: Seq[LabeledPoint], + val predictions: Seq[WeightedLabeledPoint], val monotonicityConstraint: MonotonicityConstraint) extends RegressionModel { + override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict) @@ -60,7 +72,7 @@ trait IsotonicRegressionAlgorithm extends Serializable { protected def createModel( - weights: Seq[LabeledPoint], + weights: Seq[WeightedLabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel /** @@ -70,47 +82,47 @@ trait IsotonicRegressionAlgorithm * @return model */ def run( - input: RDD[LabeledPoint], + input: RDD[WeightedLabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel /** * Run algorithm to obtain isotonic regression model * @param input data - * @param initialWeights weights * @param monotonicityConstraint asc or desc + * @param weights weights * @return */ def run( - input: RDD[LabeledPoint], - initialWeights: Vector, - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel + input: RDD[WeightedLabeledPoint], + monotonicityConstraint: MonotonicityConstraint, + weights: Vector): IsotonicRegressionModel } class PoolAdjacentViolators extends IsotonicRegressionAlgorithm { override def run( - input: RDD[LabeledPoint], + input: RDD[WeightedLabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { createModel( - parallelPoolAdjacentViolators(input, monotonicityConstraint), + parallelPoolAdjacentViolators(input, monotonicityConstraint, Vectors.dense(Array(0d))), monotonicityConstraint) } override def run( - input: RDD[LabeledPoint], - initialWeights: Vector, - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { - ??? + input: RDD[WeightedLabeledPoint], + monotonicityConstraint: MonotonicityConstraint, + weights: Vector): IsotonicRegressionModel = { + createModel( + parallelPoolAdjacentViolators(input, monotonicityConstraint, weights), + monotonicityConstraint) } override protected def createModel( - weights: Seq[LabeledPoint], + predictions: Seq[WeightedLabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { - new IsotonicRegressionModel(weights, monotonicityConstraint) + new IsotonicRegressionModel(predictions, monotonicityConstraint) } - - /** * Performs a pool adjacent violators algorithm (PAVA) * Uses approach with single processing of data where violators in previously processed @@ -123,18 +135,18 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm { * @return result */ private def poolAdjacentViolators( - in: Array[LabeledPoint], - monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = { + in: Array[WeightedLabeledPoint], + monotonicityConstraint: MonotonicityConstraint): Array[WeightedLabeledPoint] = { //Pools sub array within given bounds assigning weighted average value to all elements - def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = { + def pool(in: Array[WeightedLabeledPoint], start: Int, end: Int): Unit = { val poolSubArray = in.slice(start, end + 1) - val weightedSum = poolSubArray.map(_.label).sum - val weight = poolSubArray.length + val weightedSum = poolSubArray.map(lp => lp.label * lp.weight).sum + val weight = poolSubArray.map(_.weight).sum for(i <- start to end) { - in(i) = LabeledPoint(weightedSum / weight, in(i).features) + in(i) = WeightedLabeledPoint(weightedSum / weight, in(i).features, in(i).weight) } } @@ -175,8 +187,9 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm { * @return result */ private def parallelPoolAdjacentViolators( - testData: RDD[LabeledPoint], - monotonicityConstraint: MonotonicityConstraint): Seq[LabeledPoint] = { + testData: RDD[WeightedLabeledPoint], + monotonicityConstraint: MonotonicityConstraint, + weights: Vector): Seq[WeightedLabeledPoint] = { poolAdjacentViolators( testData @@ -200,14 +213,14 @@ object IsotonicRegression { * * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data * matrix A as well as the corresponding right hand side label y - * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * @param weights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ def train( - input: RDD[LabeledPoint], - initialWeights: Vector, - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { - new PoolAdjacentViolators().run(input, initialWeights, monotonicityConstraint) + input: RDD[WeightedLabeledPoint], + monotonicityConstraint: MonotonicityConstraint, + weights: Vector): IsotonicRegressionModel = { + new PoolAdjacentViolators().run(input, monotonicityConstraint, weights) } /** @@ -219,7 +232,7 @@ object IsotonicRegression { * matrix A as well as the corresponding right hand side label y */ def train( - input: RDD[LabeledPoint], + input: RDD[WeightedLabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { new PoolAdjacentViolators().run(input, monotonicityConstraint) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala new file mode 100644 index 0000000000000..90f7e49a3d905 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.rdd.RDD + +import scala.beans.BeanInfo + +object WeightedLabeledPointConversions { + implicit def labeledPointToWeightedLabeledPoint( + labeledPoint: LabeledPoint): WeightedLabeledPoint = { + WeightedLabeledPoint(labeledPoint.label, labeledPoint.features, 1) + } + + implicit def labeledPointRDDToWeightedLabeledPointRDD( + rdd: RDD[LabeledPoint]): RDD[WeightedLabeledPoint] = { + rdd.map(lp => WeightedLabeledPoint(lp.label, lp.features, 1)) + } +} + +/** + * Labeled point with weight + */ +@BeanInfo +case class WeightedLabeledPoint(label: Double, features: Vector, weight: Double) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala new file mode 100644 index 0000000000000..0bcb28b465c40 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.WeightedLabeledPointConversions._ +import org.apache.spark.mllib.regression.{LabeledPoint, WeightedLabeledPoint} + +import scala.collection.JavaConversions._ + +object IsotonicDataGenerator { + + /** + * Return a Java List of ordered labeled points + * @param labels list of labels for the data points + * @return Java List of input. + */ + def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[WeightedLabeledPoint] = { + seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*)) + } + + /** + * Return an ordered sequence of labeled data points with default weights + * @param labels list of labels for the data points + * @return sequence of data points + */ + def generateIsotonicInput(labels: Double*): Seq[WeightedLabeledPoint] = { + labels.zip(1 to labels.size) + .map(point => labeledPointToWeightedLabeledPoint(LabeledPoint(point._1, Vectors.dense(point._2)))) + } + + /** + * Return an ordered sequence of labeled weighted data points + * @param labels list of labels for the data points + * @param weights list of weights for the data points + * @return sequence of data points + */ + def generateWeightedIsotonicInput(labels: Seq[Double], weights: Seq[Double]): Seq[WeightedLabeledPoint] = { + labels.zip(1 to labels.size).zip(weights) + .map(point => WeightedLabeledPoint(point._1._1, Vectors.dense(point._1._2), point._2)) + } +} diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java new file mode 100644 index 0000000000000..0df1cd8e1edda --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.util.IsotonicDataGenerator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +public class JavaIsotonicRegressionSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLinearRegressionSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + } + + double difference(List expected, IsotonicRegressionModel model) { + double diff = 0; + + for(int i = 0; i < model.predictions().length(); i++) { + WeightedLabeledPoint exp = expected.get(i); + diff += Math.abs(model.predict(exp.features()) - exp.label()); + } + + return diff; + } + + @Test + public void runIsotonicRegressionUsingConstructor() { + JavaRDD testRDD = sc.parallelize(IsotonicDataGenerator + .generateIsotonicInputAsList( + new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); + + IsotonicRegressionAlgorithm isotonicRegressionAlgorithm = new PoolAdjacentViolators(); + IsotonicRegressionModel model = isotonicRegressionAlgorithm.run(testRDD.rdd(), MonotonicityConstraint.Isotonic()); + + List expected = IsotonicDataGenerator + .generateIsotonicInputAsList( + new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12}); + + Assert.assertTrue(difference(expected, model) == 0); + } + + @Test + public void runIsotonicRegressionUsingStaticMethod() { + JavaRDD testRDD = sc.parallelize(IsotonicDataGenerator + .generateIsotonicInputAsList( + new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); + + IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), MonotonicityConstraint.Isotonic()); + + List expected = IsotonicDataGenerator + .generateIsotonicInputAsList( + new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12}); + + Assert.assertTrue(difference(expected, model) == 0); + } + + @Test + public void testPredictJavaRDD() { + JavaRDD testRDD = sc.parallelize(IsotonicDataGenerator + .generateIsotonicInputAsList( + new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); + + IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), MonotonicityConstraint.Isotonic()); + + JavaRDD vectors = testRDD.map(new Function() { + @Override + public Vector call(WeightedLabeledPoint v) throws Exception { + return v.features(); + } + }); + + List predictions = model.predict(vectors).collect(); + + Assert.assertTrue(predictions.get(0) == 1d); + Assert.assertTrue(predictions.get(11) == 12d); + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 39d93bfb16a0e..0f1153b68129c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -18,30 +18,32 @@ package org.apache.spark.mllib.regression import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.MonotonicityConstraint.Enum.{Antitonic, Isotonic} import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} import org.scalatest.{Matchers, FunSuite} - +import WeightedLabeledPointConversions._ import scala.util.Random +import org.apache.spark.mllib.util.IsotonicDataGenerator._ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers { - def generateDataPoints(labels: Double*): Seq[LabeledPoint] = - labels.zip((1 to labels.size)).map(point => LabeledPoint(point._1, Vectors.dense(point._2))) + private def round(d: Double): Double = + Math.round(d * 100).toDouble / 100 test("increasing isotonic regression") { - val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() + val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() val alg = new PoolAdjacentViolators val model = alg.run(testRDD, Isotonic) - model.predictions should be(generateDataPoints(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) + model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) } test("isotonic regression with size 0") { - val testRDD = sc.parallelize(List[LabeledPoint]()).cache() + val testRDD = sc.parallelize(List[WeightedLabeledPoint]()).cache() val alg = new PoolAdjacentViolators val model = alg.run(testRDD, Isotonic) @@ -50,61 +52,108 @@ class IsotonicRegressionSuite } test("isotonic regression with size 1") { - val testRDD = sc.parallelize(generateDataPoints(1)).cache() + val testRDD = sc.parallelize(generateIsotonicInput(1)).cache() val alg = new PoolAdjacentViolators val model = alg.run(testRDD, Isotonic) - model.predictions should be(generateDataPoints(1)) + model.predictions should be(generateIsotonicInput(1)) } test("isotonic regression strictly increasing sequence") { - val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 4, 5)).cache() + val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5)).cache() val alg = new PoolAdjacentViolators val model = alg.run(testRDD, Isotonic) - model.predictions should be(generateDataPoints(1, 2, 3, 4, 5)) + model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5)) } test("isotonic regression strictly decreasing sequence") { - val testRDD = sc.parallelize(generateDataPoints(5, 4, 3, 2, 1)).cache() + val testRDD = sc.parallelize(generateIsotonicInput(5, 4, 3, 2, 1)).cache() val alg = new PoolAdjacentViolators val model = alg.run(testRDD, Isotonic) - model.predictions should be(generateDataPoints(3, 3, 3, 3, 3)) + model.predictions should be(generateIsotonicInput(3, 3, 3, 3, 3)) } test("isotonic regression with last element violating monotonicity") { - val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 4, 2)).cache() + val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 2)).cache() val alg = new PoolAdjacentViolators val model = alg.run(testRDD, Isotonic) - model.predictions should be(generateDataPoints(1, 2, 3, 3, 3)) + model.predictions should be(generateIsotonicInput(1, 2, 3, 3, 3)) } test("isotonic regression with first element violating monotonicity") { - val testRDD = sc.parallelize(generateDataPoints(4, 2, 3, 4, 5)).cache() + val testRDD = sc.parallelize(generateIsotonicInput(4, 2, 3, 4, 5)).cache() + + val alg = new PoolAdjacentViolators + val model = alg.run(testRDD, Isotonic) + + model.predictions should be(generateIsotonicInput(3, 3, 3, 4, 5)) + } + + test("isotonic regression with negative labels") { + val testRDD = sc.parallelize(generateIsotonicInput(-1, -2, 0, 1, -1)).cache() val alg = new PoolAdjacentViolators val model = alg.run(testRDD, Isotonic) - model.predictions should be(generateDataPoints(3, 3, 3, 4, 5)) + model.predictions should be(generateIsotonicInput(-1.5, -1.5, 0, 0, 0)) } test("isotonic regression with unordered input") { - val testRDD = sc.parallelize(List[LabeledPoint]()).cache() + val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5).reverse).cache() val alg = new PoolAdjacentViolators val model = alg.run(testRDD, Isotonic) - model.predictions should be(List()) + model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5)) + } + + test("weighted isotonic regression") { + val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache() + + val alg = new PoolAdjacentViolators + val model = alg.run(testRDD, Isotonic) + + model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2))) + } + + test("weighted isotonic regression with weights lower than 1") { + val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache() + + val alg = new PoolAdjacentViolators + val model = alg.run(testRDD, Isotonic) + + model.predictions.map(p => p.copy(label = round(p.label))) should be + (generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1))) + } + + test("weighted isotonic regression with negative weights") { + val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5))).cache() + + val alg = new PoolAdjacentViolators + val model = alg.run(testRDD, Isotonic) + + model.predictions.map(p => p.copy(label = round(p.label))) should be + (generateWeightedIsotonicInput(Seq(1, 10/6, 10/6, 10/6, 10/6), Seq(-1, 1, -3, 1, -5))) + } + + test("weighted isotonic regression with zero weights") { + val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(0, 0, 0, 1, 0))).cache() + + val alg = new PoolAdjacentViolators + val model = alg.run(testRDD, Isotonic) + + model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2, 2, 2), Seq(0, 0, 0, 1, 0))) } test("isotonic regression prediction") { - val testRDD = sc.parallelize(generateDataPoints(1, 2, 7, 1, 2)).cache() + val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache() val alg = new PoolAdjacentViolators val model = alg.run(testRDD, Isotonic) @@ -116,7 +165,7 @@ class IsotonicRegressionSuite } test("antitonic regression prediction") { - val testRDD = sc.parallelize(generateDataPoints(7, 5, 3, 5, 1)).cache() + val testRDD = sc.parallelize(generateIsotonicInput(7, 5, 3, 5, 1)).cache() val alg = new PoolAdjacentViolators val model = alg.run(testRDD, Antitonic) @@ -126,6 +175,18 @@ class IsotonicRegressionSuite model.predict(Vectors.dense(3)) should be(4) model.predict(Vectors.dense(10)) should be(1) } + + test("isotonic regression labeled point to weighted labeled point conversion") { + val testRDD = sc.parallelize( + List( + LabeledPoint(2, Vectors.dense(1)), + LabeledPoint(1, Vectors.dense(2)))).cache() + + val alg = new PoolAdjacentViolators + val model = alg.run(testRDD, Isotonic) + + model.predictions should be(generateIsotonicInput(1.5, 1.5)) + } } class IsotonicRegressionClusterSuite extends FunSuite with LocalClusterSparkContext {