-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1939 Refactor takeSample method in RDD to use ScaSRS #916
Closed
Closed
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
1441977
SPARK-1939 Refactor takeSample method in RDD to use ScaSRS
dorx ffea61a
SPARK-1939: Refactor takeSample method in RDD
dorx 7cab53a
fixed import bug in rdd.py
dorx e3fd6a6
Merge branch 'master' into takeSample
dorx 9bdd36e
Check sample size and move computeFraction
dorx 065ebcd
Merge branch 'master' into takeSample
dorx ae3ad04
fixed edge cases to prevent overflow
dorx f80f270
Merge branch 'master' into takeSample
dorx 0a9b3e3
"reviewer comment addressed"
dorx ecab508
"fixed checkstyle violation
dorx eff89e2
addressed reviewer comments.
dorx 55518ed
added TODO for logging in rdd.py
dorx 64e445b
logwarnning as soon as it enters the while loop
dorx dc699f3
give back imports removed by accident in rdd.py
dorx 1481b01
washing test tubes and making coffee
dorx fb1452f
allowing num to be greater than count in all cases
dorx 48d954d
remove unused imports from RDDSuite
dorx 82dde31
update pyspark's takeSample
mengxr 3de882b
Merge pull request #2 from mengxr/SPARK-1939
dorx 444e750
edge cases
dorx 5b061ae
merge master
dorx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
55 changes: 55 additions & 0 deletions
55
core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.util.random | ||
|
||
private[spark] object SamplingUtils { | ||
|
||
/** | ||
* Returns a sampling rate that guarantees a sample of size >= sampleSizeLowerBound 99.99% of | ||
* the time. | ||
* | ||
* How the sampling rate is determined: | ||
* Let p = num / total, where num is the sample size and total is the total number of | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first sentence should be a brief description. |
||
* datapoints in the RDD. We're trying to compute q > p such that | ||
* - when sampling with replacement, we're drawing each datapoint with prob_i ~ Pois(q), | ||
* where we want to guarantee Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to total), | ||
* i.e. the failure rate of not having a sufficiently large sample < 0.0001. | ||
* Setting q = p + 5 * sqrt(p/total) is sufficient to guarantee 0.9999 success rate for | ||
* num > 12, but we need a slightly larger q (9 empirically determined). | ||
* - when sampling without replacement, we're drawing each datapoint with prob_i | ||
* ~ Binomial(total, fraction) and our choice of q guarantees 1-delta, or 0.9999 success | ||
* rate, where success rate is defined the same as in sampling with replacement. | ||
* | ||
* @param sampleSizeLowerBound sample size | ||
* @param total size of RDD | ||
* @param withReplacement whether sampling with replacement | ||
* @return a sampling rate that guarantees sufficient sample size with 99.99% success rate | ||
*/ | ||
def computeFractionForSampleSize(sampleSizeLowerBound: Int, total: Long, | ||
withReplacement: Boolean): Double = { | ||
val fraction = sampleSizeLowerBound.toDouble / total | ||
if (withReplacement) { | ||
val numStDev = if (sampleSizeLowerBound < 12) 9 else 5 | ||
fraction + numStDev * math.sqrt(fraction / total) | ||
} else { | ||
val delta = 1e-4 | ||
val gamma = - math.log(delta) / total | ||
math.min(1, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction)) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
46 changes: 46 additions & 0 deletions
46
core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.util.random | ||
|
||
import org.apache.commons.math3.distribution.{BinomialDistribution, PoissonDistribution} | ||
import org.scalatest.FunSuite | ||
|
||
class SamplingUtilsSuite extends FunSuite { | ||
|
||
test("computeFraction") { | ||
// test that the computed fraction guarantees enough data points | ||
// in the sample with a failure rate <= 0.0001 | ||
val n = 100000 | ||
|
||
for (s <- 1 to 15) { | ||
val frac = SamplingUtils.computeFractionForSampleSize(s, n, true) | ||
val poisson = new PoissonDistribution(frac * n) | ||
assert(poisson.inverseCumulativeProbability(0.0001) >= s, "Computed fraction is too low") | ||
} | ||
for (s <- List(20, 100, 1000)) { | ||
val frac = SamplingUtils.computeFractionForSampleSize(s, n, true) | ||
val poisson = new PoissonDistribution(frac * n) | ||
assert(poisson.inverseCumulativeProbability(0.0001) >= s, "Computed fraction is too low") | ||
} | ||
for (s <- List(1, 10, 100, 1000)) { | ||
val frac = SamplingUtils.computeFractionForSampleSize(s, n, false) | ||
val binomial = new BinomialDistribution(n, frac) | ||
assert(binomial.inverseCumulativeProbability(0.0001)*n >= s, "Computed fraction is too low") | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be
<scope>test</scope>
if it's a test-only dependency?