-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30178][ML] RobustScaler support large numFeatures #26803
Conversation
Test build #115013 has finished for PR 26803 at commit
|
Test build #115015 has finished for PR 26803 at commit
|
mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala
Outdated
Show resolved
Hide resolved
new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, localRelativeError))( | ||
seqOp = (s, v) => s.insert(v), | ||
combOp = (s1, s2) => s1.compress.merge(s2.compress) | ||
).mapValues{ s => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: space before brace
mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala
Outdated
Show resolved
Hide resolved
combOp = (s1, s2) => s1.compress.merge(s2.compress) | ||
).mapValues{ s => | ||
// confirm compression before query | ||
val s2 = s.compress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also need to call compress
, otherwise it will cause testsuite fails due to calling query before compression.
3807027
to
dab3d3a
Compare
Test build #115073 has finished for PR 26803 at commit
|
Test build #115099 has finished for PR 26803 at commit
|
51f833c
to
5283294
Compare
Test build #115147 has finished for PR 26803 at commit
|
@srowen I impl a simple val rdd = sc.range(0, 10000, 1, 100)
val rdd2 = rdd.map{i => (i % 10, i)}
val rdd3 = rdd2.treeAggregateByKey(0.0, new HashPartitioner(3))(_+_, _+_, 3)
rdd3.collect and ran successfully. BTW, it is reasonable to call |
|
||
// compute scale by the logic in treeAggregate with depth=2 | ||
// TODO: design a common treeAggregateByKey in PairRDDFunctions? | ||
val scale = math.max(math.ceil(math.sqrt(vectors.getNumPartitions)).toInt, 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking this through more: say there are M partitions of data, and N dimensions / features. This produces N*sqrt(M) new partitions.
N*sqrt(M) is better than than N before, for the case I mentioned, where N is small and M is large. The number of partitions doesn't drop so much that you might lose too much parallelism for large data. You're still losing some parallelism compared to processing with M partitions, in the case where sqrt(M) > N.
N*sqrt(M) is on the other hand a lot of partitions, if N is large and M is small, which is your case. What if N = 10000? You might go from ten partitions to a thousand. Or a million if M is large as well as N.
I'm trying to figure out if we can mostly get rid of the factor of N.
What if you keyed by (i, partition / N)? You end up with about N * M/N = M partitions in the aggregation, which is nice. Well, that's roughly true if N < M. Of course, if N > M then this still gives you N partitions out, but that's better than N*sqrt(M) for your case I believe. WDYT?
I think this still bears a quick benchmark on the 'common case' of few features and a nontrivial amount of data / partitions. I am slightly concerned this might be much slower for that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
say there are M partitions of data, and N dimensions / features. This produces N*sqrt(M) new partitions.
Sorry I am try to figure out what partition here means. Does it means a intermediate aggregator QuantileSummaries
?
Or if you mean a partition in internal rdds, I think it is still M partitions, since all the ops (mapPartitionsWithIndex
->aggregateByKey
->map
->reduceByKey
) before collect
do not change numPartitions.
The base logic is quite similar to treeAggregate
:
Say a key (i=1), the orginal rdd has 16 partitions. There will be 16 aggregator QuantileSummaries
for i=1 at first, one per partition.
Then after aggregateByKey
there will partitally merged and there are 4 QuantileSummaries
.
Finally reduceByKey
merge them to generate only one QuantileSummaries
on some executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm yeah nevermind the comment about partitions, that doesn't change. It's just the number of aggregation keys. I don't know whether increasing the number of aggregation keys is something we should avoid or not, but that's not the reason, OK.
Anyway, I'd still be interested how it performs before and after, any rough benchmark, for the small N case. I'm just sort of concerned it could be slower for that common case because now the whole data set is chopped up finely and shuffled in all cases. Doesn't mean we can't do it, just wondering if there is a big difference to know about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will do some performance tests
Test build #115185 has finished for PR 26803 at commit
|
test code: import org.apache.spark.ml.linalg._
import org.apache.spark.ml.feature._
import org.apache.spark.storage.StorageLevel
val rdd = sc.range(0, 10000000, 1, 100)
val df = rdd.map(i => Tuple1.apply(Vectors.dense((i % 1000).toDouble / 1000))).toDF("features")
df.persist(StorageLevel.MEMORY_AND_DISK)
df.count
val scaler = new RobustScaler().setInputCol("features")
val start = System.currentTimeMillis; Seq.range(0, 100).foreach{_ => val model = scaler.fit(df)}; val end = System.currentTimeMillis
end - start Master: 243493 That is to say this PR will support medium/large (>1000) numFeatures at the cost of some performance regression on low-dim cases. |
Moreover, for existing impl, we can use |
a94d5d6
to
a4d4ef4
Compare
val localLower = $(lower) | ||
|
||
// each featureIndex and its QuantileSummaries | ||
val summaries = if (numFeatures < 1000) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my tests with small dataset, when numFeatures>4000, then it may cause OOM.
Since the memory overhead of a QuantileSummaries is related to the number of values it has absorbed, I set a threshold=1000 conservatively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so you have both old and new impl here. That complexity isn't great, but it does avoid performance regression while fixing the problem.
Is it possible to add a test with a lot of features, but not much data? just to exercise the code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I revisit this PR and found that the performance regression may come from two side:
1, QuantileSummaries
are not compressed after map side;
2, two-step aggregation: aggregateByKey
& reduceByKey
may not improve the performance as supposed;
So I come back to the inital commit, and use a trick to compress aggregators at map out. Now it is faster than the previous commit but still about 10% slower than Master in the edge case.
Since this impl is concise, so I prefer it to previous ones.
Test build #115438 has finished for PR 26803 at commit
|
Test build #115486 has finished for PR 26803 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, hm, this still kind of has the 'skew' problem for few features. I'm not super against this as it is a tidy idea but see a few more comments here.
}.aggregateByKey( | ||
new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, localRelativeError))( | ||
// compress the QuantileSummaries at the end of partition | ||
seqOp = (s, v) => if (v.isNaN) s.compress else s.insert(v), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you'll necessarily get the NaNs at the end, right? you'll get multiple NaNs too, so if you did get them all at the end you would compress repeatedly. (I wonder if compress can be smarter about returning itself if there's no work to do, not sure)
You might happen to get the values for keys by partition though in practice, which would work out well if true. Is that what this is kind of hoping for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I append multiple NaNs at the end of each partions, since there are numFeatures aggregators on each partition. One NaN is for a aggregator to trigger a compress.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After trigger compression at the map side, I do not need to confirm compression at merge combOp
and the final query
in the previous commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think my mental model of aggregateByKey might be wrong or something, but what are you guaranteed about the order in which seqOp encounters the values from flatMap previously? after a shuffle collects all the values for a feature and starts aggregating a partition of some values, you might get the NaNs first or in the middle or something. Worst case they're all at the start and you dont' end up compressing the final result at all. It would make sense to me if it happened to shuffle the values in order, and you'd encounter (..., NaN, ...values..., NaN, ...values..., NaN) as it would aggregate the values as they came out of flatMap in order. I'm not sure if there's a better way to affect what seqOp does "last" though, so maybe it's the best that can be done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no guarantee in the api doc of aggregateByKey
or somewhere about the order in which seqOp encounters the values. I just dig into its impl and think it should work like this. So I agree that it is not a nice way to trigger compression like this.
I guess adding a new RDD operation like aggregateByKeyWithinPartitions
maybe a better choice.
I will remove the usage of NaN and confirm comression before merge and query.
val range = s.query(localUpper).get - s.query(localLower).get | ||
val median = s.query(0.5).get | ||
(range, median) | ||
}.sortBy(_._1).values.collect().unzip |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably won't matter, but do you want to sort these with Spark, or just sort locally? You collect all the values either way. It might be faster to not do it in Spark.
|
||
val (ranges, medians) = vectors.mapPartitions { iter => | ||
if (iter.hasNext) { | ||
iter.flatMap { vec => Iterator.range(0, numFeatures).map(i => (i, vec(i))) } ++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll effectively filter NaNs later anyway by not inserting them. Do you want to filter them explicitly up here to avoid extra compress calls? Would this change behavior if NaN is in the data? I'd guess you would get NaN or an error before, but would just ignore them now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
Existing impl do not take NaN into account, and a QuantileSummaries
can absorb a NaN without throwing an exception, however it can not be used then:
scala> import org.apache.spark.sql.catalyst.util.QuantileSummaries
import org.apache.spark.sql.catalyst.util.QuantileSummaries
scala> val s1 = new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 0.0001)
s1: org.apache.spark.sql.catalyst.util.QuantileSummaries = org.apache.spark.sql.catalyst.util.QuantileSummaries@577dac16
scala> var s1 = new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 0.0001)
s1: org.apache.spark.sql.catalyst.util.QuantileSummaries = org.apache.spark.sql.catalyst.util.QuantileSummaries@5006a697
scala> s1 = s1.insert(Double.NaN)
s1: org.apache.spark.sql.catalyst.util.QuantileSummaries = org.apache.spark.sql.catalyst.util.QuantileSummaries@5006a697
scala> s1 = s1.compress
s1: org.apache.spark.sql.catalyst.util.QuantileSummaries = org.apache.spark.sql.catalyst.util.QuantileSummaries@5680f009
scala> s1.query(0.5)
res1: Option[Double] = Some(NaN)
after absorbing a NaN value, it return NaN in qurey
.
Then I test sklearn and it just ignore NaN now, so I guess we can also skip NaN for now
from sklearn.preprocessing import RobustScaler
import numpy as np
X = np.array([[0.0, 0.0], [np.nan, -0.5], [1.0, -1.0], [1.5, np.nan], [2.0, -2.0]])
transformer = RobustScaler().fit(X)
transformer.center_
Out[5]: array([ 1.25, -0.75])
transformer.scale_
Out[6]: array([0.875, 0.875])
transformer.transform(X)
Out[7]:
array([[-1.42857143, 0.85714286],
[ nan, 0.28571429],
[-0.28571429, -0.28571429],
[ 0.28571429, nan],
[ 0.85714286, -1.42857143]])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, and this is new in 3.0 I suppose, so, no need for a release note. It might be worth docuemnting the behavior in scaladoc
Test build #115536 has finished for PR 26803 at commit
|
Test build #115562 has finished for PR 26803 at commit
|
I think we can resolve it by adding new RDD ops, maybe my previous |
Test build #115610 has finished for PR 26803 at commit
|
@srowen If you do not object, I will merge this in one or two days. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK this is back to the original simpler implementation. But I think this has the performance issue again. What's the current perf hit for, say, n=2 features but significant amounts of data? I would not want to merge this and think about the rest later if it's significant.
03b9bd0
to
321a076
Compare
@srowen OK, I will revert the last commits and add a testsuite for high-dim dataset. |
Test build #115686 has finished for PR 26803 at commit
|
retest this please |
Test build #115724 has finished for PR 26803 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, heh this is bouncing around, hard to keep track.
I think I'm OK with keeping both implementations to avoid a perf hit. 1000 seems like roughly the cutoff where you'd switch implementations, based on your findings.
As long as there are tests for both paths, OK
Merged to master, thanks @srowen for reviewing! |
### What changes were proposed in this pull request? compute the medians/ranges more distributedly ### Why are the changes needed? It is a bottleneck to collect the whole Array[QuantileSummaries] from executors, since a QuantileSummaries is a large object, which maintains arrays of large sizes 10k(`defaultCompressThreshold`)/50k(`defaultHeadSize`). In Spark-Shell with default params, I processed a dataset with numFeatures=69,200, and existing impl fail due to OOM. After this PR, it will sucessfuly fit the model. ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing testsuites Closes apache#26803 from zhengruifeng/robust_high_dim. Authored-by: zhengruifeng <[email protected]> Signed-off-by: zhengruifeng <[email protected]>
What changes were proposed in this pull request?
compute the medians/ranges more distributedly
Why are the changes needed?
It is a bottleneck to collect the whole Array[QuantileSummaries] from executors,
since a QuantileSummaries is a large object, which maintains arrays of large sizes 10k(
defaultCompressThreshold
)/50k(defaultHeadSize
).In Spark-Shell with default params, I processed a dataset with numFeatures=69,200, and existing impl fail due to OOM.
After this PR, it will sucessfuly fit the model.
Does this PR introduce any user-facing change?
No
How was this patch tested?
existing testsuites