Skip to content

Commit

Permalink
[SPARK-10235] [MLLIB] update since versions in mllib.regression
Browse files Browse the repository at this point in the history
Same as #8421 but for `mllib.regression`.

cc freeman-lab dbtsai

Author: Xiangrui Meng <[email protected]>

Closes #8426 from mengxr/SPARK-10235 and squashes the following commits:

6cd28e4 [Xiangrui Meng] update since versions in mllib.regression
  • Loading branch information
mengxr authored and DB Tsai committed Aug 26, 2015
1 parent fb7e12f commit 4657fa1
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import org.apache.spark.storage.StorageLevel
*/
@Since("0.8.0")
@DeveloperApi
abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double)
abstract class GeneralizedLinearModel @Since("1.0.0") (
@Since("1.0.0") val weights: Vector,
@Since("0.8.0") val intercept: Double)
extends Serializable {

/**
Expand Down Expand Up @@ -107,7 +109,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
* The optimizer to solve the problem.
*
*/
@Since("1.0.0")
@Since("0.8.0")
def optimizer: Optimizer

/** Whether to add intercept (default: false). */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ import org.apache.spark.sql.SQLContext
*/
@Since("1.3.0")
@Experimental
class IsotonicRegressionModel (
val boundaries: Array[Double],
val predictions: Array[Double],
val isotonic: Boolean) extends Serializable with Saveable {
class IsotonicRegressionModel @Since("1.3.0") (
@Since("1.3.0") val boundaries: Array[Double],
@Since("1.3.0") val predictions: Array[Double],
@Since("1.3.0") val isotonic: Boolean) extends Serializable with Saveable {

private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse

Expand All @@ -63,7 +63,6 @@ class IsotonicRegressionModel (

/**
* A Java-friendly constructor that takes two Iterable parameters and one Boolean parameter.
*
*/
@Since("1.4.0")
def this(boundaries: java.lang.Iterable[Double],
Expand Down Expand Up @@ -214,8 +213,6 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
}
}

/**
*/
@Since("1.4.0")
override def load(sc: SparkContext, path: String): IsotonicRegressionModel = {
implicit val formats = DefaultFormats
Expand Down Expand Up @@ -256,13 +253,15 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
* @see [[http://en.wikipedia.org/wiki/Isotonic_regression Isotonic regression (Wikipedia)]]
*/
@Experimental
@Since("1.3.0")
class IsotonicRegression private (private var isotonic: Boolean) extends Serializable {

/**
* Constructs IsotonicRegression instance with default parameter isotonic = true.
*
* @return New instance of IsotonicRegression.
*/
@Since("1.3.0")
def this() = this(true)

/**
Expand All @@ -271,6 +270,7 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return This instance of IsotonicRegression.
*/
@Since("1.3.0")
def setIsotonic(isotonic: Boolean): this.type = {
this.isotonic = isotonic
this
Expand All @@ -286,6 +286,7 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
* the algorithm is executed.
* @return Isotonic regression model.
*/
@Since("1.3.0")
def run(input: RDD[(Double, Double, Double)]): IsotonicRegressionModel = {
val preprocessedInput = if (isotonic) {
input
Expand All @@ -311,6 +312,7 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
* the algorithm is executed.
* @return Isotonic regression model.
*/
@Since("1.3.0")
def run(input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = {
run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import org.apache.spark.SparkException
*
* @param label Label for this data point.
* @param features List of features for this data point.
*
*/
@Since("0.8.0")
@BeanInfo
case class LabeledPoint(label: Double, features: Vector) {
case class LabeledPoint @Since("1.0.0") (
@Since("0.8.0") label: Double,
@Since("1.0.0") features: Vector) {
override def toString: String = {
s"($label,$features)"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import org.apache.spark.rdd.RDD
*
*/
@Since("0.8.0")
class LassoModel (
override val weights: Vector,
override val intercept: Double)
class LassoModel @Since("1.1.0") (
@Since("1.0.0") override val weights: Vector,
@Since("0.8.0") override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept)
with RegressionModel with Serializable with Saveable with PMMLExportable {

Expand Down Expand Up @@ -84,6 +84,7 @@ object LassoModel extends Loader[LassoModel] {
* its corresponding right hand side label y.
* See also the documentation for the precise formulation.
*/
@Since("0.8.0")
class LassoWithSGD private (
private var stepSize: Double,
private var numIterations: Int,
Expand All @@ -93,6 +94,7 @@ class LassoWithSGD private (

private val gradient = new LeastSquaresGradient()
private val updater = new L1Updater()
@Since("0.8.0")
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
Expand All @@ -103,6 +105,7 @@ class LassoWithSGD private (
* Construct a Lasso object with default parameters: {stepSize: 1.0, numIterations: 100,
* regParam: 0.01, miniBatchFraction: 1.0}.
*/
@Since("0.8.0")
def this() = this(1.0, 100, 0.01, 1.0)

override protected def createModel(weights: Vector, intercept: Double) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import org.apache.spark.rdd.RDD
*
*/
@Since("0.8.0")
class LinearRegressionModel (
override val weights: Vector,
override val intercept: Double)
class LinearRegressionModel @Since("1.1.0") (
@Since("1.0.0") override val weights: Vector,
@Since("0.8.0") override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable
with Saveable with PMMLExportable {

Expand Down Expand Up @@ -85,6 +85,7 @@ object LinearRegressionModel extends Loader[LinearRegressionModel] {
* its corresponding right hand side label y.
* See also the documentation for the precise formulation.
*/
@Since("0.8.0")
class LinearRegressionWithSGD private[mllib] (
private var stepSize: Double,
private var numIterations: Int,
Expand All @@ -93,6 +94,7 @@ class LinearRegressionWithSGD private[mllib] (

private val gradient = new LeastSquaresGradient()
private val updater = new SimpleUpdater()
@Since("0.8.0")
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
Expand All @@ -102,6 +104,7 @@ class LinearRegressionWithSGD private[mllib] (
* Construct a LinearRegression object with default parameters: {stepSize: 1.0,
* numIterations: 100, miniBatchFraction: 1.0}.
*/
@Since("0.8.0")
def this() = this(1.0, 100, 1.0)

override protected[mllib] def createModel(weights: Vector, intercept: Double) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import org.apache.spark.rdd.RDD
*
*/
@Since("0.8.0")
class RidgeRegressionModel (
override val weights: Vector,
override val intercept: Double)
class RidgeRegressionModel @Since("1.1.0") (
@Since("1.0.0") override val weights: Vector,
@Since("0.8.0") override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept)
with RegressionModel with Serializable with Saveable with PMMLExportable {

Expand Down Expand Up @@ -85,6 +85,7 @@ object RidgeRegressionModel extends Loader[RidgeRegressionModel] {
* its corresponding right hand side label y.
* See also the documentation for the precise formulation.
*/
@Since("0.8.0")
class RidgeRegressionWithSGD private (
private var stepSize: Double,
private var numIterations: Int,
Expand All @@ -94,7 +95,7 @@ class RidgeRegressionWithSGD private (

private val gradient = new LeastSquaresGradient()
private val updater = new SquaredL2Updater()

@Since("0.8.0")
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
Expand All @@ -105,6 +106,7 @@ class RidgeRegressionWithSGD private (
* Construct a RidgeRegression object with default parameters: {stepSize: 1.0, numIterations: 100,
* regParam: 0.01, miniBatchFraction: 1.0}.
*/
@Since("0.8.0")
def this() = this(1.0, 100, 0.01, 1.0)

override protected def createModel(weights: Vector, intercept: Double) = {
Expand Down Expand Up @@ -134,7 +136,7 @@ object RidgeRegressionWithSGD {
* the number of features in the data.
*
*/
@Since("0.8.0")
@Since("1.0.0")
def train(
input: RDD[LabeledPoint],
numIterations: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.streaming.api.java.{JavaDStream, JavaPairDStream}
import org.apache.spark.streaming.dstream.DStream

Expand Down Expand Up @@ -83,9 +83,8 @@ abstract class StreamingLinearAlgorithm[
* batch of data from the stream.
*
* @param data DStream containing labeled data
*
*/
@Since("1.3.0")
@Since("1.1.0")
def trainOn(data: DStream[LabeledPoint]): Unit = {
if (model.isEmpty) {
throw new IllegalArgumentException("Model must be initialized before starting training.")
Expand All @@ -105,7 +104,6 @@ abstract class StreamingLinearAlgorithm[

/**
* Java-friendly version of `trainOn`.
*
*/
@Since("1.3.0")
def trainOn(data: JavaDStream[LabeledPoint]): Unit = trainOn(data.dstream)
Expand All @@ -129,7 +127,7 @@ abstract class StreamingLinearAlgorithm[
* Java-friendly version of `predictOn`.
*
*/
@Since("1.1.0")
@Since("1.3.0")
def predictOn(data: JavaDStream[Vector]): JavaDStream[java.lang.Double] = {
JavaDStream.fromDStream(predictOn(data.dstream).asInstanceOf[DStream[java.lang.Double]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.regression

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.mllib.linalg.Vector

/**
Expand All @@ -41,6 +41,7 @@ import org.apache.spark.mllib.linalg.Vector
* .trainOn(DStream)
*/
@Experimental
@Since("1.1.0")
class StreamingLinearRegressionWithSGD private[mllib] (
private var stepSize: Double,
private var numIterations: Int,
Expand All @@ -54,15 +55,18 @@ class StreamingLinearRegressionWithSGD private[mllib] (
* Initial weights must be set before using trainOn or predictOn
* (see `StreamingLinearAlgorithm`)
*/
@Since("1.1.0")
def this() = this(0.1, 50, 1.0)

@Since("1.1.0")
val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)

protected var model: Option[LinearRegressionModel] = None

/**
* Set the step size for gradient descent. Default: 0.1.
*/
@Since("1.1.0")
def setStepSize(stepSize: Double): this.type = {
this.algorithm.optimizer.setStepSize(stepSize)
this
Expand All @@ -71,6 +75,7 @@ class StreamingLinearRegressionWithSGD private[mllib] (
/**
* Set the number of iterations of gradient descent to run per update. Default: 50.
*/
@Since("1.1.0")
def setNumIterations(numIterations: Int): this.type = {
this.algorithm.optimizer.setNumIterations(numIterations)
this
Expand All @@ -79,6 +84,7 @@ class StreamingLinearRegressionWithSGD private[mllib] (
/**
* Set the fraction of each batch to use for updates. Default: 1.0.
*/
@Since("1.1.0")
def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
this.algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)
this
Expand All @@ -87,6 +93,7 @@ class StreamingLinearRegressionWithSGD private[mllib] (
/**
* Set the initial weights.
*/
@Since("1.1.0")
def setInitialWeights(initialWeights: Vector): this.type = {
this.model = Some(algorithm.createModel(initialWeights, 0.0))
this
Expand All @@ -95,9 +102,9 @@ class StreamingLinearRegressionWithSGD private[mllib] (
/**
* Set the convergence tolerance. Default: 0.001.
*/
@Since("1.5.0")
def setConvergenceTol(tolerance: Double): this.type = {
this.algorithm.optimizer.setConvergenceTol(tolerance)
this
}

}

0 comments on commit 4657fa1

Please sign in to comment.