From 8f5daf9072f23ef46102fe4419da5cf79212bc2f Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Sun, 30 Nov 2014 18:19:59 -0500 Subject: [PATCH] SPARK-3278 added comments and cleaned up api to consistently handle weights --- .../mllib/regression/IsotonicRegression.scala | 108 ++++++++---------- .../regression/WeightedLabeledPoint.scala | 12 +- .../JavaIsotonicRegressionSuite.java | 1 - .../regression/IsotonicRegressionSuite.scala | 2 +- 4 files changed, 57 insertions(+), 66 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 7a1a84aa2eef6..2b769fd3a2150 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -17,24 +17,35 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.linalg.{Vectors, Vector} -import org.apache.spark.mllib.regression.MonotonicityConstraint.Enum.MonotonicityConstraint +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint.{Isotonic, MonotonicityConstraint} import org.apache.spark.rdd.RDD +/** + * Monotonicity constrains for monotone regression + * Isotonic (increasing) + * Antitonic (decreasing) + */ object MonotonicityConstraint { - object Enum { + object MonotonicityConstraint { sealed trait MonotonicityConstraint { private[regression] def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean } + /** + * Isotonic monotonicity constraint. Increasing sequence + */ case object Isotonic extends MonotonicityConstraint { override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = { current.label <= next.label } } + /** + * Antitonic monotonicity constrain. Decreasing sequence + */ case object Antitonic extends MonotonicityConstraint { override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = { current.label >= next.label @@ -42,14 +53,15 @@ object MonotonicityConstraint { } } - val Isotonic = Enum.Isotonic - val Antitonic = Enum.Antitonic + val Isotonic = MonotonicityConstraint.Isotonic + val Antitonic = MonotonicityConstraint.Antitonic } /** * Regression model for Isotonic regression * * @param predictions Weights computed for every feature. + * @param monotonicityConstraint specifies if the sequence is increasing or decreasing */ class IsotonicRegressionModel( val predictions: Seq[WeightedLabeledPoint], @@ -59,10 +71,11 @@ class IsotonicRegressionModel( override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict) - //take the highest of elements smaller than our feature or weight with lowest feature - override def predict(testData: Vector): Double = + override def predict(testData: Vector): Double = { + //take the highest of data points smaller than our feature or data point with lowest feature (predictions.head +: predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label + } } /** @@ -71,49 +84,40 @@ class IsotonicRegressionModel( trait IsotonicRegressionAlgorithm extends Serializable { + /** + * Creates isotonic regression model with given parameters + * + * @param predictions labels estimated using isotonic regression algorithm. Used for predictions on new data points. + * @param monotonicityConstraint isotonic or antitonic + * @return isotonic regression model + */ protected def createModel( - weights: Seq[WeightedLabeledPoint], + predictions: Seq[WeightedLabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel /** * Run algorithm to obtain isotonic regression model + * * @param input data * @param monotonicityConstraint ascending or descenting - * @return model + * @return isotonic regression model */ def run( input: RDD[WeightedLabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel - - /** - * Run algorithm to obtain isotonic regression model - * @param input data - * @param monotonicityConstraint asc or desc - * @param weights weights - * @return - */ - def run( - input: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint, - weights: Vector): IsotonicRegressionModel } -class PoolAdjacentViolators extends IsotonicRegressionAlgorithm { +/** + * Parallel pool adjacent violators algorithm for monotone regression + */ +class PoolAdjacentViolators private [mllib] + extends IsotonicRegressionAlgorithm { override def run( input: RDD[WeightedLabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { createModel( - parallelPoolAdjacentViolators(input, monotonicityConstraint, Vectors.dense(Array(0d))), - monotonicityConstraint) - } - - override def run( - input: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint, - weights: Vector): IsotonicRegressionModel = { - createModel( - parallelPoolAdjacentViolators(input, monotonicityConstraint, weights), + parallelPoolAdjacentViolators(input, monotonicityConstraint), monotonicityConstraint) } @@ -180,7 +184,7 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm { /** * Performs parallel pool adjacent violators algorithm - * Calls PAVA on each partition and then again on the result + * Calls Pool adjacent violators on each partition and then again on the result * * @param testData input * @param monotonicityConstraint asc or desc @@ -188,8 +192,7 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm { */ private def parallelPoolAdjacentViolators( testData: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint, - weights: Vector): Seq[WeightedLabeledPoint] = { + monotonicityConstraint: MonotonicityConstraint): Seq[WeightedLabeledPoint] = { poolAdjacentViolators( testData @@ -201,39 +204,24 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm { } /** - * Top-level methods for calling IsotonicRegression. + * Top-level methods for monotone regression (either isotonic or antitonic). */ object IsotonicRegression { /** - * Train a Linear Regression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using the specified step size. Each iteration uses - * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used - * in gradient descent are initialized using the initial weights provided. + * Train a monotone regression model given an RDD of (label, features, weight). + * Currently only one dimensional algorithm is supported (features.length is one) + * Label is the dependent y value + * Weight of the data point is the number of measurements. Default is 1 * - * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data - * matrix A as well as the corresponding right hand side label y - * @param weights Initial set of weights to be used. Array should be equal in size to - * the number of features in the data. + * @param input RDD of (label, array of features, weight). Each point describes a row of the data + * matrix A as well as the corresponding right hand side label y + * and weight as number of measurements + * @param monotonicityConstraint */ def train( input: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint, - weights: Vector): IsotonicRegressionModel = { - new PoolAdjacentViolators().run(input, monotonicityConstraint, weights) - } - - /** - * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using the specified step size. Each iteration uses - * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. - * - * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data - * matrix A as well as the corresponding right hand side label y - */ - def train( - input: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { + monotonicityConstraint: MonotonicityConstraint = Isotonic): IsotonicRegressionModel = { new PoolAdjacentViolators().run(input, monotonicityConstraint) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala index 90f7e49a3d905..1bb78c65aa647 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/WeightedLabeledPoint.scala @@ -25,17 +25,21 @@ import scala.beans.BeanInfo object WeightedLabeledPointConversions { implicit def labeledPointToWeightedLabeledPoint( labeledPoint: LabeledPoint): WeightedLabeledPoint = { - WeightedLabeledPoint(labeledPoint.label, labeledPoint.features, 1) + WeightedLabeledPoint(labeledPoint.label, labeledPoint.features) } implicit def labeledPointRDDToWeightedLabeledPointRDD( rdd: RDD[LabeledPoint]): RDD[WeightedLabeledPoint] = { - rdd.map(lp => WeightedLabeledPoint(lp.label, lp.features, 1)) + rdd.map(lp => WeightedLabeledPoint(lp.label, lp.features)) } } /** - * Labeled point with weight + * Class that represents the features and labels of a data point with associated weight + * + * @param label Label for this data point. + * @param features List of features for this data point. + * @param weight Weight of the data point. Defaults to 1. */ @BeanInfo -case class WeightedLabeledPoint(label: Double, features: Vector, weight: Double) +case class WeightedLabeledPoint(label: Double, features: Vector, weight: Double = 1) diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java index 0df1cd8e1edda..b3285271fcbd1 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java @@ -28,7 +28,6 @@ import org.junit.Test; import java.io.Serializable; -import java.util.Arrays; import java.util.List; public class JavaIsotonicRegressionSuite implements Serializable { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 0f1153b68129c..c5ba42e912e28 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.regression import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.MonotonicityConstraint.Enum.{Antitonic, Isotonic} +import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint.{Antitonic, Isotonic} import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} import org.scalatest.{Matchers, FunSuite} import WeightedLabeledPointConversions._