From 941fd1f88c0ed1d45403adde00667cd253c92c35 Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Sun, 11 Jan 2015 23:58:29 +0000 Subject: [PATCH] SPARK-3278 Isotonic regression java api --- .../mllib/regression/IsotonicRegression.scala | 56 ++++++++++++--- .../mllib/util/IsotonicDataGenerator.scala | 7 +- .../JavaIsotonicRegressionSuite.java | 69 +++++-------------- .../regression/IsotonicRegressionSuite.scala | 11 +-- 4 files changed, 71 insertions(+), 72 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 55a59ab7024fd..dda666c4c2c1b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.regression +import org.apache.spark.api.java.{JavaRDD, JavaPairRDD} import org.apache.spark.rdd.RDD /** @@ -30,9 +31,30 @@ class IsotonicRegressionModel ( val isotonic: Boolean) extends Serializable { + /** + * Predict labels for provided features + * + * @param testData features to be labeled + * @return predicted labels + */ def predict(testData: RDD[Double]): RDD[Double] = testData.map(predict) + /** + * Predict labels for provided features + * + * @param testData features to be labeled + * @return predicted labels + */ + def predict(testData: JavaRDD[java.lang.Double]): RDD[java.lang.Double] = + testData.rdd.map(x => x.doubleValue()).map(predict) + + /** + * Predict a single label + * + * @param testData feature to be labeled + * @return predicted label + */ def predict(testData: Double): Double = // Take the highest of data points smaller than our feature or data point with lowest feature (predictions.head +: predictions.filter(y => y._2 <= testData)).last._1 @@ -59,7 +81,7 @@ trait IsotonicRegressionAlgorithm /** * Run algorithm to obtain isotonic regression model * - * @param input data + * @param input (label, feature, weight) * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence * @return isotonic regression model */ @@ -115,12 +137,11 @@ class PoolAdjacentViolators private [mllib] } } - def monotonicityConstraint(isotonic: Boolean): (Double, Double) => Boolean = - (x, y) => if(isotonic) { - x <= y - } else { - x >= y - } + val isotonicConstraint: (Double, Double) => Boolean = (x, y) => x <= y + val antitonicConstraint: (Double, Double) => Boolean = (x, y) => x >= y + + def monotonicityConstraint(isotonic: Boolean) = + if(isotonic) isotonicConstraint else antitonicConstraint val monotonicityConstraintHolds = monotonicityConstraint(isotonic) @@ -179,12 +200,11 @@ class PoolAdjacentViolators private [mllib] object IsotonicRegression { /** - * Train a monotone regression model given an RDD of (label, features, weight). - * Currently only one dimensional algorithm is supported (features.length is one) + * Train a monotone regression model given an RDD of (label, feature, weight). * Label is the dependent y value * Weight of the data point is the number of measurements. Default is 1 * - * @param input RDD of (label, array of features, weight). + * @param input RDD of (label, feature, weight). * Each point describes a row of the data * matrix A as well as the corresponding right hand side label y * and weight as number of measurements @@ -195,4 +215,20 @@ object IsotonicRegression { isotonic: Boolean = true): IsotonicRegressionModel = { new PoolAdjacentViolators().run(input, isotonic) } + + /** + * Train a monotone regression model given an RDD of (label, feature). + * Label is the dependent y value + * Weight defaults to 1 + * + * @param input RDD of (label, feature). + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence + * @return + */ + def train( + input: JavaPairRDD[java.lang.Double, java.lang.Double], + isotonic: Boolean): IsotonicRegressionModel = { + new PoolAdjacentViolators() + .run(input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), 1d)), isotonic) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala index a4714e9b59ff2..e95e511f3199b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.util import scala.collection.JavaConversions._ +import java.lang.{Double => JDouble} object IsotonicDataGenerator { @@ -26,13 +27,11 @@ object IsotonicDataGenerator { * @param labels list of labels for the data points * @return Java List of input. */ - def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[(Double, Double, Double)] = { - seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*)) + def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[(JDouble, JDouble)] = { + seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*).map(x => (new JDouble(x._1), new JDouble(x._2)))) //.map(d => new Tuple3(new java.lang.Double(d._1), new java.lang.Double(d._2), new java.lang.Double(d._3)))) } - def bam(d: Option[Double]): Double = d.get - /** * Return an ordered sequence of labeled data points with default weights * @param labels list of labels for the data points diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java index 5dd5cffed66fb..ee072ef601e1e 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java @@ -13,8 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - *//* - + */ package org.apache.spark.mllib.regression; @@ -22,18 +21,14 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.util.IsotonicDataGenerator; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import scala.Tuple2; -import scala.Tuple3; import java.io.Serializable; -import java.util.Arrays; import java.util.List; public class JavaIsotonicRegressionSuite implements Serializable { @@ -50,52 +45,26 @@ public void tearDown() { sc = null; } - double difference(List> expected, IsotonicRegressionModel model) { + double difference(List> expected, IsotonicRegressionModel model) { double diff = 0; for(int i = 0; i < model.predictions().length(); i++) { - Tuple3 exp = expected.get(i); + Tuple2 exp = expected.get(i); diff += Math.abs(model.predict(exp._2()) - exp._1()); } return diff; } - */ -/*@Test - public void runIsotonicRegressionUsingConstructor() { - JavaRDD> testRDD = sc.parallelize(IsotonicDataGenerator - .generateIsotonicInputAsList( - new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); - - IsotonicRegressionAlgorithm isotonicRegressionAlgorithm = new PoolAdjacentViolators(); - IsotonicRegressionModel model = isotonicRegressionAlgorithm.run(testRDD.rdd(), true); - - List> expected = IsotonicDataGenerator - .generateIsotonicInputAsList( - new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12}); - - Assert.assertTrue(difference(expected, model) == 0); - }*//* - - @Test public void runIsotonicRegressionUsingStaticMethod() { - */ -/*JavaRDD> testRDD = sc.parallelize(IsotonicDataGenerator - .generateIsotonicInputAsList( - new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();*//* - - - */ -/*JavaRDD> testRDD = sc.parallelize(Arrays.asList(new Tuple3(1.0, 1.0, 1.0)));*//* + JavaPairRDD trainRDD = sc.parallelizePairs( + IsotonicDataGenerator.generateIsotonicInputAsList( + new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); + IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true); - JavaPairRDD testRDD = sc.parallelizePairs(Arrays.asList(new Tuple2(1.0, 1.0))); - - IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), true); - - List> expected = IsotonicDataGenerator + List> expected = IsotonicDataGenerator .generateIsotonicInputAsList( new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12}); @@ -104,23 +73,23 @@ public void runIsotonicRegressionUsingStaticMethod() { @Test public void testPredictJavaRDD() { - JavaRDD> testRDD = sc.parallelize(IsotonicDataGenerator - .generateIsotonicInputAsList( - new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); + JavaPairRDD trainRDD = sc.parallelizePairs( + IsotonicDataGenerator.generateIsotonicInputAsList( + new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); - IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), true); + IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true); - JavaRDD vectors = testRDD.map(new Function, Vector>() { + JavaRDD testRDD = trainRDD.map(new Function, Double>() { @Override - public Vector call(Tuple3 v) throws Exception { - return Vectors.dense(v._2()); + public Double call(Tuple2 v) throws Exception { + return v._2(); } }); - List predictions = model.predict(vectors).collect(); + Double[] predictions = model.predict(testRDD).collect(); - Assert.assertTrue(predictions.get(0) == 1d); - Assert.assertTrue(predictions.get(11) == 12d); + Assert.assertTrue(predictions[0] == 1d); + Assert.assertTrue(predictions[11] == 12d); } } -*/ + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 24efcbc4f3dae..389c9add10e15 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -197,18 +197,13 @@ class IsotonicRegressionClusterSuite extends FunSuite with LocalClusterSparkContext { + //TODO: FIX test("task size should be small in both training and prediction") { - val n = 5 - - val trainData = (0 to n).map(i => (i.toDouble, i.toDouble, 1.toDouble)) + val n = 135000 + val trainData = (0 to n).map(i => (i.toDouble, i.toDouble, 1d)) val points = sc.parallelize(trainData, 1) - /*val points = sc.parallelize(0 until n, 2).mapPartitionsWithIndex { (idx, iter) => - val random = new Random(idx) - iter.map(i => (random.nextDouble(), random.nextDouble(), 1)) - }.cache()*/ - // If we serialize data directly in the task closure, the size of the serialized task would be // greater than 1MB and hence Spark would throw an error. val model = IsotonicRegression.train(points, true)