Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2852][MLLIB] Separate model from IDF/StandardScaler algorithms #1814

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 61 additions & 69 deletions mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,87 +36,25 @@ class IDF {

// TODO: Allow different IDF formulations.

private var brzIdf: BDV[Double] = _

/**
* Computes the inverse document frequency.
* @param dataset an RDD of term frequency vectors
*/
def fit(dataset: RDD[Vector]): this.type = {
brzIdf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)(
def fit(dataset: RDD[Vector]): IDFModel = {
val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)(
seqOp = (df, v) => df.add(v),
combOp = (df1, df2) => df1.merge(df2)
).idf()
this
new IDFModel(idf)
}

/**
* Computes the inverse document frequency.
* @param dataset a JavaRDD of term frequency vectors
*/
def fit(dataset: JavaRDD[Vector]): this.type = {
def fit(dataset: JavaRDD[Vector]): IDFModel = {
fit(dataset.rdd)
}

/**
* Transforms term frequency (TF) vectors to TF-IDF vectors.
* @param dataset an RDD of term frequency vectors
* @return an RDD of TF-IDF vectors
*/
def transform(dataset: RDD[Vector]): RDD[Vector] = {
if (!initialized) {
throw new IllegalStateException("Haven't learned IDF yet. Call fit first.")
}
val theIdf = brzIdf
val bcIdf = dataset.context.broadcast(theIdf)
dataset.mapPartitions { iter =>
val thisIdf = bcIdf.value
iter.map { v =>
val n = v.size
v match {
case sv: SparseVector =>
val nnz = sv.indices.size
val newValues = new Array[Double](nnz)
var k = 0
while (k < nnz) {
newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
k += 1
}
Vectors.sparse(n, sv.indices, newValues)
case dv: DenseVector =>
val newValues = new Array[Double](n)
var j = 0
while (j < n) {
newValues(j) = dv.values(j) * thisIdf(j)
j += 1
}
Vectors.dense(newValues)
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}
}
}

/**
* Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
* @param dataset a JavaRDD of term frequency vectors
* @return a JavaRDD of TF-IDF vectors
*/
def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(dataset.rdd).toJavaRDD()
}

/** Returns the IDF vector. */
def idf(): Vector = {
if (!initialized) {
throw new IllegalStateException("Haven't learned IDF yet. Call fit first.")
}
Vectors.fromBreeze(brzIdf)
}

private def initialized: Boolean = brzIdf != null
}

private object IDF {
Expand Down Expand Up @@ -177,18 +115,72 @@ private object IDF {
private def isEmpty: Boolean = m == 0L

/** Returns the current IDF vector. */
def idf(): BDV[Double] = {
def idf(): Vector = {
if (isEmpty) {
throw new IllegalStateException("Haven't seen any document yet.")
}
val n = df.length
val inv = BDV.zeros[Double](n)
val inv = new Array[Double](n)
var j = 0
while (j < n) {
inv(j) = math.log((m + 1.0)/ (df(j) + 1.0))
j += 1
}
inv
Vectors.dense(inv)
}
}
}

/**
* :: Experimental ::
* Represents an IDF model that can transform term frequency vectors.
*/
@Experimental
class IDFModel private[mllib] (val idf: Vector) extends Serializable {

/**
* Transforms term frequency (TF) vectors to TF-IDF vectors.
* @param dataset an RDD of term frequency vectors
* @return an RDD of TF-IDF vectors
*/
def transform(dataset: RDD[Vector]): RDD[Vector] = {
val bcIdf = dataset.context.broadcast(idf)
dataset.mapPartitions { iter =>
val thisIdf = bcIdf.value
iter.map { v =>
val n = v.size
v match {
case sv: SparseVector =>
val nnz = sv.indices.size
val newValues = new Array[Double](nnz)
var k = 0
while (k < nnz) {
newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
k += 1
}
Vectors.sparse(n, sv.indices, newValues)
case dv: DenseVector =>
val newValues = new Array[Double](n)
var j = 0
while (j < n) {
newValues(j) = dv.values(j) * thisIdf(j)
j += 1
}
Vectors.dense(newValues)
case other =>
throw new UnsupportedOperationException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following exception is used for unsupported vector in appendBias and StandardScaler, maybe we could have a global definition of this in util.

case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to use different error messages. In that case, having a util function doesn't save us much.

s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}
}
}

/**
* Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
* @param dataset a JavaRDD of term frequency vectors
* @return a JavaRDD of TF-IDF vectors
*/
def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(dataset.rdd).toJavaRDD()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.mllib.feature

import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}

import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.rdd.RDDFunctions._
Expand All @@ -35,37 +36,55 @@ import org.apache.spark.rdd.RDD
* @param withStd True by default. Scales the data to unit standard deviation.
*/
@Experimental
class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransformer {
class StandardScaler(withMean: Boolean, withStd: Boolean) extends Logging {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is only used for keeping the state of withMean, and withStd, is it possible to move those states to fit function by overloading, and make it as object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current API is more consistent with others like LinearRegressionWithSGD. We can add them later after we figure out the right way to specify parameters.

def this() = this(false, true)

require(withMean || withStd, s"withMean and withStd both equal to false. Doing nothing.")

private var mean: BV[Double] = _
private var factor: BV[Double] = _
if (!(withMean || withStd)) {
logWarning("Both withMean and withStd are false. The model does nothing.")
}

/**
* Computes the mean and variance and stores as a model to be used for later scaling.
*
* @param data The data used to compute the mean and variance to build the transformation model.
* @return This StandardScalar object.
* @return a StandardScalarModel
*/
def fit(data: RDD[Vector]): this.type = {
def fit(data: RDD[Vector]): StandardScalerModel = {
// TODO: skip computation if both withMean and withStd are false
val summary = data.treeAggregate(new MultivariateOnlineSummarizer)(
(aggregator, data) => aggregator.add(data),
(aggregator1, aggregator2) => aggregator1.merge(aggregator2))
new StandardScalerModel(withMean, withStd, summary.mean, summary.variance)
}
}

mean = summary.mean.toBreeze
factor = summary.variance.toBreeze
require(mean.length == factor.length)
/**
* :: Experimental ::
* Represents a StandardScaler model that can transform vectors.
*
* @param withMean whether to center the data before scaling
* @param withStd whether to scale the data to have unit standard deviation
* @param mean column mean values
* @param variance column variance values
*/
@Experimental
class StandardScalerModel private[mllib] (
val withMean: Boolean,
val withStd: Boolean,
val mean: Vector,
val variance: Vector) extends VectorTransformer {

require(mean.size == variance.size)

private lazy val factor: BDV[Double] = {
val f = BDV.zeros[Double](variance.size)
var i = 0
while (i < factor.length) {
factor(i) = if (factor(i) != 0.0) 1.0 / math.sqrt(factor(i)) else 0.0
while (i < f.size) {
f(i) = if (variance(i) != 0.0) 1.0 / math.sqrt(variance(i)) else 0.0
i += 1
}

this
f
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since users may want to know the variance of the training set, should we have constructor

class StandardScalerModel private[mllib] (
    val withMean: Boolean,
    val withStd: Boolean,
    val mean: BV[Double],
    val variance: BV[Double]) {

  lazy val factor = { 
    val temp = variance.clone
    while (i < temp.size) {
    temp(i) = if (temp(i) != 0.0) 1.0 / math.sqrt(temp(i)) else 0.0
     i += 1
     temp
    }
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

/**
Expand All @@ -76,13 +95,7 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor
* for the column with zero variance.
*/
override def transform(vector: Vector): Vector = {
if (mean == null || factor == null) {
throw new IllegalStateException(
"Haven't learned column summary statistics yet. Call fit first.")
}

require(vector.size == mean.length)

require(mean.size == vector.size)
if (withMean) {
vector.toBreeze match {
case dv: BDV[Double] =>
Expand Down Expand Up @@ -115,5 +128,4 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor
vector
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,12 @@ class IDFSuite extends FunSuite with LocalSparkContext {
val m = localTermFrequencies.size
val termFrequencies = sc.parallelize(localTermFrequencies, 2)
val idf = new IDF
intercept[IllegalStateException] {
idf.idf()
}
intercept[IllegalStateException] {
idf.transform(termFrequencies)
}
idf.fit(termFrequencies)
val model = idf.fit(termFrequencies)
val expected = Vectors.dense(Array(0, 3, 1, 2).map { x =>
math.log((m.toDouble + 1.0) / (x + 1.0))
})
assert(idf.idf() ~== expected absTol 1e-12)
val tfidf = idf.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap()
assert(model.idf ~== expected absTol 1e-12)
val tfidf = model.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap()
assert(tfidf.size === 3)
val tfidf0 = tfidf(0L).asInstanceOf[SparseVector]
assert(tfidf0.indices === Array(1, 3))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,17 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext {
val standardizer2 = new StandardScaler()
val standardizer3 = new StandardScaler(withMean = true, withStd = false)

withClue("Using a standardizer before fitting the model should throw exception.") {
intercept[IllegalStateException] {
data.map(standardizer1.transform)
}
}

standardizer1.fit(dataRDD)
standardizer2.fit(dataRDD)
standardizer3.fit(dataRDD)
val model1 = standardizer1.fit(dataRDD)
val model2 = standardizer2.fit(dataRDD)
val model3 = standardizer3.fit(dataRDD)

val data1 = data.map(standardizer1.transform)
val data2 = data.map(standardizer2.transform)
val data3 = data.map(standardizer3.transform)
val data1 = data.map(model1.transform)
val data2 = data.map(model2.transform)
val data3 = data.map(model3.transform)

val data1RDD = standardizer1.transform(dataRDD)
val data2RDD = standardizer2.transform(dataRDD)
val data3RDD = standardizer3.transform(dataRDD)
val data1RDD = model1.transform(dataRDD)
val data2RDD = model2.transform(dataRDD)
val data3RDD = model3.transform(dataRDD)

val summary = computeSummary(dataRDD)
val summary1 = computeSummary(data1RDD)
Expand Down Expand Up @@ -129,25 +123,25 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext {
val standardizer2 = new StandardScaler()
val standardizer3 = new StandardScaler(withMean = true, withStd = false)

standardizer1.fit(dataRDD)
standardizer2.fit(dataRDD)
standardizer3.fit(dataRDD)
val model1 = standardizer1.fit(dataRDD)
val model2 = standardizer2.fit(dataRDD)
val model3 = standardizer3.fit(dataRDD)

val data2 = data.map(standardizer2.transform)
val data2 = data.map(model2.transform)

withClue("Standardization with mean can not be applied on sparse input.") {
intercept[IllegalArgumentException] {
data.map(standardizer1.transform)
data.map(model1.transform)
}
}

withClue("Standardization with mean can not be applied on sparse input.") {
intercept[IllegalArgumentException] {
data.map(standardizer3.transform)
data.map(model3.transform)
}
}

val data2RDD = standardizer2.transform(dataRDD)
val data2RDD = model2.transform(dataRDD)

val summary2 = computeSummary(data2RDD)

Expand Down Expand Up @@ -181,13 +175,13 @@ class StandardScalerSuite extends FunSuite with LocalSparkContext {
val standardizer2 = new StandardScaler(withMean = true, withStd = false)
val standardizer3 = new StandardScaler(withMean = false, withStd = true)

standardizer1.fit(dataRDD)
standardizer2.fit(dataRDD)
standardizer3.fit(dataRDD)
val model1 = standardizer1.fit(dataRDD)
val model2 = standardizer2.fit(dataRDD)
val model3 = standardizer3.fit(dataRDD)

val data1 = data.map(standardizer1.transform)
val data2 = data.map(standardizer2.transform)
val data3 = data.map(standardizer3.transform)
val data1 = data.map(model1.transform)
val data2 = data.map(model2.transform)
val data3 = data.map(model3.transform)

assert(data1.forall(_.toArray.forall(_ == 0.0)),
"The variance is zero, so the transformed result should be 0.0")
Expand Down