Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8341] Significant selector feature transformation #6795

Closed
wants to merge 3 commits into from

Conversation

catap
Copy link
Contributor

@catap catap commented Jun 13, 2015

dea of this transformation it safe reduce big vector that was produced by Hashing TF for example
for reduce requirement of memory for manipulation on them.

This transformation create a model that keep only indices that has different values on fit stage.

Example of usage:

import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD

val hashingTF = new HashingTF
val localDocs: Seq[(Double, Array[String])] = Seq(
  (1d, "a a b b b c d".split(" ")),
  (0d, "a b c d a b c".split(" ")),
  (1d, "c b a c b a a".split(" ")))

val docs = sc.parallelize(localDocs, 2)

val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words))}
// scala> tf.first().features.size
//  res4: Int = 1048576

val transformer = new SignificantSelector().fit(tf.map(_.features))

val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features)))
// scala> transformed_tf.first().features.size
// res5: Int = 4

// now you have smallest vector that has same features,
// but request less memory for manipulation on DecisionTree for example

Idea of this transformation it safe reduce big vector that was produced by Hashing TF for example
for reduce requirement of memory for manipulation on them.

This transformation create a model that keep only indices that has different values on fit stage.

Example of usage:
```
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD

val hashingTF = new HashingTF
val localDocs: Seq[(Double, Array[String])] = Seq(
  (1d, "a a b b b c d".split(" ")),
  (0d, "a b c d a b c".split(" ")),
  (1d, "c b a c b a a".split(" ")))

val docs = sc.parallelize(localDocs, 2)

val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words))}
// scala> tf.first().features.size
//  res4: Int = 1048576

val transformer = new SignificantSelector().fit(tf.map(_.features))

val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features)))
// scala> transformed_tf.first().features.size
// res5: Int = 4

// now you have smallest vector that has same features,
// but request less memory for manipulation on DecisionTree for example
```
@catap catap force-pushed the significant_selector branch from 93bdda0 to 77cd36b Compare June 13, 2015 09:03
@feynmanliang
Copy link
Contributor

Can you provide a performance comparison? HashingTF outputs a sparse vector so I'm not too sure how significant the performance increase will be.

@catap
Copy link
Contributor Author

catap commented Jul 3, 2015

Sure.

I run following script on this branch at spark-shell with SPARK_MEM=4g on my laptop

import java.util.Random

import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.rdd.RDD

import scala.concurrent.duration._
import scala.language.implicitConversions

val rnd = new Random

val length = 20

implicit def bool2Double(b:Boolean): Double = if (b) 1d else 0d

val localDocs = (1 to length).map(_ =>
  (rnd.nextBoolean().toDouble, (1 to 1000).map(_ => rnd.nextDouble().toString)))

val docs = sc.parallelize(localDocs, 2)

val hashingTF = new HashingTF()

val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words)) }

val transformer = new SignificantSelector().fit(tf.map(_.features))

val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features)))

val start = System.currentTimeMillis()
val model = DecisionTree.trainClassifier(tf, 2, Map[Int, Int](), "gini", 30, length)
val end = System.currentTimeMillis()

val transformed_start = System.currentTimeMillis()
val transformed_model = DecisionTree.trainClassifier(transformed_tf, 2, Map[Int, Int](), "gini", 30, length)
val transformed_end = System.currentTimeMillis()

val elapsed = DurationLong(end - start).millis
val transformed_elapsed = DurationLong(transformed_end - transformed_start).millis

println("Ok, done.")
println(f"Elapsed time ${elapsed.toMinutes} min, ${elapsed.toSeconds % 60} sec, ${elapsed.toMillis % 1000} millis (total ${elapsed.toMillis} millis)")
println(f"Elapsed transformed time ${transformed_elapsed.toMinutes} min, ${transformed_elapsed.toSeconds % 60} sec, ${transformed_elapsed.toMillis % 1000} millis (total ${transformed_elapsed.toMillis} millis)")

and had result:

Elapsed time 2 min, 14 sec, 268 millis (total 134268 millis)
Elapsed transformed time 0 min, 2 sec, 803 millis (total 2803 millis)

@feynmanliang
Copy link
Contributor

Can you collect the RDDs before you do the timing (run a tf.collect() before the timing experiments)? tf could be getting computed for model and cached so transformed_model is fast.

Also, I don't think I completely understand what SignificantSelector is supposed to do (can you answer my inline questions)? Thanks!

val values = e.groupBy(_._1)
val sum = e.map(_._2).sum

values.size + (if (sum == sources_count || values.contains(0.0)) 0 else 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this must be >1 for idx to not be filtered, but I'm not too clear on what the if (..) 0 else 1 is doing. Can you add some comments to describe your logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is hack for case when you has RDD what include dense and sparse vector.

Sparse vector hasn't got zero elements (0d) and values.size for sparse vector has count only for different non zero value.

If you have in RDD sparse and dense vector and last one has zero element where sparse vectro hasn't got element significant understand it's different values but isn't it.

For example, let's see following code:

    val vectors = sc.parallelize(List(
      Vectors.dense(0.0, 2.0, 3.0, 4.0),
      Vectors.dense(0.0, 2.0, 3.0, 4.0),
      Vectors.dense(0.0, 2.0, 3.0, 4.0),
      Vectors.sparse(4, Seq((1, 3.0), (2, 4.0))),
      Vectors.dense(0.0, 3.0, 5.0, 4.0),
      Vectors.dense(0.0, 3.0, 7.0, 4.0)
    ))

first element of each vector is zero for dense and empty for sparse. With out this hack significant induces will have first element, because they has different values (zero and empty), but if you convert sparse vector to dense vector significant induces hasn't got first element.

It is clear now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparseVector#size should give you the total size of the sparse vector, while SparseVector#numNonzeros gives you the number of nonzero values.

Also, SparseVectors may contain zero elements (e.g. Vectors.sparse(1, Seq((0, 0.0)))); it's just that elements which are not active (in values) are assumed to be zero.

I'm still not clear on what (sum == sources_count || values.contains(0.0)) is testing: the first is true if all the vectors in the RDD were dense and the second is true if any of the vectors in the RDD were dense or if a 0.0 was present in a sparse vector. What are you trying to test here?

I think that you could make the code better by making the handling of sparse/dense more uniform and explicit so other developers can more easily understand. Some suggestions:

  • Break up the chained transformations into some intermediate variables with more descriptive names
  • Add comments to describe what's happening at important parts of the code
  • Converting all vectors to some uniform type so the logic is explicit and the code is more uniform. If everything is converted to dense, then sum == sources_count will always be true. Right now the per-case logic of handling dense and sparse is buried in L115 and is not immediately apparent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be clear about sparse vector before we will be on same way about this logic.

In my point of view: sparse vector is the vector where missing all zero elements for memory optimization. For example here you can found same definition:

A vector is a one-dimensional array of elements. The natural C++ implementation of a vector is as a one-dimensional array. However, in many applications, the elements of a vector have mostly zero values. Such a vector is said to be sparse. It is inefficient to use a one-dimensional array to store a sparse vector. It is also inefficient to add elements whose values are zero in forming sums of sparse vectors. Consequently, we should choose a different representation.

@catap
Copy link
Contributor Author

catap commented Jul 3, 2015

@feynmanliang I think you mean tf.persist(), don't you? Because tf.collect() convert RDD to local array and I can't use this on DecisionTree.trainClassifier.

So, when I re-run test with tf.persist() and transformed_tf.persist(), I had result:

Elapsed time 2 min, 5 sec, 711 millis (total 125711 millis)
Elapsed transformed time 0 min, 2 sec, 750 millis (total 2750 millis)

the code:

import java.util.Random

import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.rdd.RDD

import scala.concurrent.duration._
import scala.language.implicitConversions

val rnd = new Random

val length = 20

implicit def bool2Double(b:Boolean): Double = if (b) 1d else 0d

val localDocs = (1 to length).map(_ =>
  (rnd.nextBoolean().toDouble, (1 to 1000).map(_ => rnd.nextDouble().toString)))

val docs = sc.parallelize(localDocs, 2)

val hashingTF = new HashingTF()

val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words)) }

val transformer = new SignificantSelector().fit(tf.map(_.features))

val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features)))

tf.persist()

val start = System.currentTimeMillis()
val model = DecisionTree.trainClassifier(tf, 2, Map[Int, Int](), "gini", 30, length)
val end = System.currentTimeMillis()

tf.unpersist()

transformed_tf.persist()

val transformed_start = System.currentTimeMillis()
val transformed_model = DecisionTree.trainClassifier(transformed_tf, 2, Map[Int, Int](), "gini", 30, length)
val transformed_end = System.currentTimeMillis()

transformed_tf.unpersist()

val elapsed = DurationLong(end - start).millis
val transformed_elapsed = DurationLong(transformed_end - transformed_start).millis

println("Ok, done.")
println(f"Elapsed time ${elapsed.toMinutes} min, ${elapsed.toSeconds % 60} sec, ${elapsed.toMillis % 1000} millis (total ${elapsed.toMillis} millis)")
println(f"Elapsed transformed time ${transformed_elapsed.toMinutes} min, ${transformed_elapsed.toSeconds % 60} sec, ${transformed_elapsed.toMillis % 1000} millis (total ${transformed_elapsed.toMillis} millis)")

@catap catap force-pushed the significant_selector branch from 1e2c8cd to 90e42fb Compare July 3, 2015 06:13
@catap catap force-pushed the significant_selector branch from 90e42fb to 62954be Compare July 3, 2015 06:16
))

val significant = new SignificantSelector().fit(vectors)
assert(significant.transform(dv).toString == "[]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test equality of vectors

@feynmanliang
Copy link
Contributor

Actually, I think the performance gains are becausethe default HashingTF has 2^20 bins but your test data has at most 20000 unique words.

If you configure the HashingTF properly (e.g. val tf = new HashingTF(25000)), is the performance difference still significant?

BTW, I think your test perf tests helped identify a possible improvement to have DecisionTree exploit sparsity (i.e. rather than dropping the columns altogether, have the downstream algorithms exploit the sparse representation and ignore the inactive indicies).

Also, are there other places besides an improperly configured HashingTF where this may be used? Do you know of any real world use cases or papers using this technique?

@catap
Copy link
Contributor Author

catap commented Jul 3, 2015

I wrote this code to optimize speed of real case on my data.

Decrease size of HashingTF is a wrong way because you also increase possibility and count of collisions.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mengxr
Copy link
Contributor

mengxr commented Jul 17, 2015

@catap Some high-level comments:

  1. The name SignificantSelector does not reflect what it really does. There are many ways to define feature significance. It is hard to guess that this one means removing constant columns.
  2. The example you mentioned doesn't fully support this transformer. If you don't want zero columns, use bag of words. If there are many values and you have to use hashing, this transformer doesn't really help reduce the number of columns.

I don't see how this transformer could help real-world use cases. But if you have seen reference use cases, please let me know. Thanks!

@mengxr
Copy link
Contributor

mengxr commented Sep 15, 2015

@catap I'm going to close this PR for now. Please submit it as a Spark package at http://spark-packages.org if you want to provide this implementation to others. Thanks!

@asfgit asfgit closed this in 0d9ab01 Sep 15, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants