-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8341] Significant selector feature transformation #6795
Conversation
Idea of this transformation it safe reduce big vector that was produced by Hashing TF for example for reduce requirement of memory for manipulation on them. This transformation create a model that keep only indices that has different values on fit stage. Example of usage: ``` import org.apache.spark.SparkContext import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD val hashingTF = new HashingTF val localDocs: Seq[(Double, Array[String])] = Seq( (1d, "a a b b b c d".split(" ")), (0d, "a b c d a b c".split(" ")), (1d, "c b a c b a a".split(" "))) val docs = sc.parallelize(localDocs, 2) val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words))} // scala> tf.first().features.size // res4: Int = 1048576 val transformer = new SignificantSelector().fit(tf.map(_.features)) val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features))) // scala> transformed_tf.first().features.size // res5: Int = 4 // now you have smallest vector that has same features, // but request less memory for manipulation on DecisionTree for example ```
93bdda0
to
77cd36b
Compare
Can you provide a performance comparison? |
Sure. I run following script on this branch at spark-shell with import java.util.Random
import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.rdd.RDD
import scala.concurrent.duration._
import scala.language.implicitConversions
val rnd = new Random
val length = 20
implicit def bool2Double(b:Boolean): Double = if (b) 1d else 0d
val localDocs = (1 to length).map(_ =>
(rnd.nextBoolean().toDouble, (1 to 1000).map(_ => rnd.nextDouble().toString)))
val docs = sc.parallelize(localDocs, 2)
val hashingTF = new HashingTF()
val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words)) }
val transformer = new SignificantSelector().fit(tf.map(_.features))
val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features)))
val start = System.currentTimeMillis()
val model = DecisionTree.trainClassifier(tf, 2, Map[Int, Int](), "gini", 30, length)
val end = System.currentTimeMillis()
val transformed_start = System.currentTimeMillis()
val transformed_model = DecisionTree.trainClassifier(transformed_tf, 2, Map[Int, Int](), "gini", 30, length)
val transformed_end = System.currentTimeMillis()
val elapsed = DurationLong(end - start).millis
val transformed_elapsed = DurationLong(transformed_end - transformed_start).millis
println("Ok, done.")
println(f"Elapsed time ${elapsed.toMinutes} min, ${elapsed.toSeconds % 60} sec, ${elapsed.toMillis % 1000} millis (total ${elapsed.toMillis} millis)")
println(f"Elapsed transformed time ${transformed_elapsed.toMinutes} min, ${transformed_elapsed.toSeconds % 60} sec, ${transformed_elapsed.toMillis % 1000} millis (total ${transformed_elapsed.toMillis} millis)") and had result:
|
Can you collect the RDDs before you do the timing (run a Also, I don't think I completely understand what |
val values = e.groupBy(_._1) | ||
val sum = e.map(_._2).sum | ||
|
||
values.size + (if (sum == sources_count || values.contains(0.0)) 0 else 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this must be >1 for idx
to not be filtered, but I'm not too clear on what the if (..) 0 else 1
is doing. Can you add some comments to describe your logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this is hack for case when you has RDD what include dense and sparse vector.
Sparse vector hasn't got zero elements (0d
) and values.size
for sparse vector has count only for different non zero value.
If you have in RDD sparse and dense vector and last one has zero element where sparse vectro hasn't got element significant understand it's different values but isn't it.
For example, let's see following code:
val vectors = sc.parallelize(List(
Vectors.dense(0.0, 2.0, 3.0, 4.0),
Vectors.dense(0.0, 2.0, 3.0, 4.0),
Vectors.dense(0.0, 2.0, 3.0, 4.0),
Vectors.sparse(4, Seq((1, 3.0), (2, 4.0))),
Vectors.dense(0.0, 3.0, 5.0, 4.0),
Vectors.dense(0.0, 3.0, 7.0, 4.0)
))
first element of each vector is zero for dense and empty for sparse. With out this hack significant induces will have first element, because they has different values (zero and empty), but if you convert sparse vector to dense vector significant induces hasn't got first element.
It is clear now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparseVector#size
should give you the total size of the sparse vector, while SparseVector#numNonzeros
gives you the number of nonzero values.
Also, SparseVectors may contain zero elements (e.g. Vectors.sparse(1, Seq((0, 0.0)))
); it's just that elements which are not active (in values
) are assumed to be zero.
I'm still not clear on what (sum == sources_count || values.contains(0.0))
is testing: the first is true if all the vectors in the RDD
were dense and the second is true if any of the vectors in the RDD
were dense or if a 0.0
was present in a sparse vector. What are you trying to test here?
I think that you could make the code better by making the handling of sparse/dense more uniform and explicit so other developers can more easily understand. Some suggestions:
- Break up the chained transformations into some intermediate variables with more descriptive names
- Add comments to describe what's happening at important parts of the code
- Converting all vectors to some uniform type so the logic is explicit and the code is more uniform. If everything is converted to dense, then
sum == sources_count
will always be true. Right now the per-case logic of handling dense and sparse is buried in L115 and is not immediately apparent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's be clear about sparse vector before we will be on same way about this logic.
In my point of view: sparse vector is the vector where missing all zero elements for memory optimization. For example here you can found same definition:
A vector is a one-dimensional array of elements. The natural C++ implementation of a vector is as a one-dimensional array. However, in many applications, the elements of a vector have mostly zero values. Such a vector is said to be sparse. It is inefficient to use a one-dimensional array to store a sparse vector. It is also inefficient to add elements whose values are zero in forming sums of sparse vectors. Consequently, we should choose a different representation.
@feynmanliang I think you mean So, when I re-run test with
the code: import java.util.Random
import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.rdd.RDD
import scala.concurrent.duration._
import scala.language.implicitConversions
val rnd = new Random
val length = 20
implicit def bool2Double(b:Boolean): Double = if (b) 1d else 0d
val localDocs = (1 to length).map(_ =>
(rnd.nextBoolean().toDouble, (1 to 1000).map(_ => rnd.nextDouble().toString)))
val docs = sc.parallelize(localDocs, 2)
val hashingTF = new HashingTF()
val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words)) }
val transformer = new SignificantSelector().fit(tf.map(_.features))
val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features)))
tf.persist()
val start = System.currentTimeMillis()
val model = DecisionTree.trainClassifier(tf, 2, Map[Int, Int](), "gini", 30, length)
val end = System.currentTimeMillis()
tf.unpersist()
transformed_tf.persist()
val transformed_start = System.currentTimeMillis()
val transformed_model = DecisionTree.trainClassifier(transformed_tf, 2, Map[Int, Int](), "gini", 30, length)
val transformed_end = System.currentTimeMillis()
transformed_tf.unpersist()
val elapsed = DurationLong(end - start).millis
val transformed_elapsed = DurationLong(transformed_end - transformed_start).millis
println("Ok, done.")
println(f"Elapsed time ${elapsed.toMinutes} min, ${elapsed.toSeconds % 60} sec, ${elapsed.toMillis % 1000} millis (total ${elapsed.toMillis} millis)")
println(f"Elapsed transformed time ${transformed_elapsed.toMinutes} min, ${transformed_elapsed.toSeconds % 60} sec, ${transformed_elapsed.toMillis % 1000} millis (total ${transformed_elapsed.toMillis} millis)") |
1e2c8cd
to
90e42fb
Compare
90e42fb
to
62954be
Compare
)) | ||
|
||
val significant = new SignificantSelector().fit(vectors) | ||
assert(significant.transform(dv).toString == "[]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test equality of vectors
Actually, I think the performance gains are becausethe default If you configure the BTW, I think your test perf tests helped identify a possible improvement to have Also, are there other places besides an improperly configured |
I wrote this code to optimize speed of real case on my data. Decrease size of HashingTF is a wrong way because you also increase possibility and count of collisions. |
Can one of the admins verify this patch? |
@catap Some high-level comments:
I don't see how this transformer could help real-world use cases. But if you have seen reference use cases, please let me know. Thanks! |
@catap I'm going to close this PR for now. Please submit it as a Spark package at http://spark-packages.org if you want to provide this implementation to others. Thanks! |
dea of this transformation it safe reduce big vector that was produced by Hashing TF for example
for reduce requirement of memory for manipulation on them.
This transformation create a model that keep only indices that has different values on fit stage.
Example of usage: