Skip to content

Commit

Permalink
[SPARK-2082] stratified sampling in PairRDDFunctions that guarantees …
Browse files Browse the repository at this point in the history
…exact sample size

Implemented stratified sampling that guarantees exact sample size using ScaRSR with two passes over the RDD for sampling without replacement and three passes for sampling with replacement.

Author: Doris Xin <[email protected]>
Author: Xiangrui Meng <[email protected]>

Closes apache#1025 from dorx/stratified and squashes the following commits:

245439e [Doris Xin] moved minSamplingRate to getUpperBound
eaf5771 [Doris Xin] bug fixes.
17a381b [Doris Xin] fixed a merge issue and a failed unit
ea7d27f [Doris Xin] merge master
b223529 [Xiangrui Meng] use approx bounds for poisson fix poisson mean for waitlisting add unit tests for Java
b3013a4 [Xiangrui Meng] move math3 back to test scope
eecee5f [Doris Xin] Merge branch 'master' into stratified
f4c21f3 [Doris Xin] Reviewer comments
a10e68d [Doris Xin] style fix
a2bf756 [Doris Xin] Merge branch 'master' into stratified
680b677 [Doris Xin] use mapPartitionWithIndex instead
9884a9f [Doris Xin] style fix
bbfb8c9 [Doris Xin] Merge branch 'master' into stratified
ee9d260 [Doris Xin] addressed reviewer comments
6b5b10b [Doris Xin] Merge branch 'master' into stratified
254e03c [Doris Xin] minor fixes and Java API.
4ad516b [Doris Xin] remove unused imports from PairRDDFunctions
bd9dc6e [Doris Xin] unit bug and style violation fixed
1fe1cff [Doris Xin] Changed fractionByKey to a map to enable arg check
944a10c [Doris Xin] [SPARK-2145] Add lower bound on sampling rate
0214a76 [Doris Xin] cleanUp
90d94c0 [Doris Xin] merge master
9e74ab5 [Doris Xin] Separated out most of the logic in sampleByKey
7327611 [Doris Xin] merge master
50581fc [Doris Xin] added a TODO for logging in python
46f6c8c [Doris Xin] fixed the NPE caused by closures being cleaned before being passed into the aggregate function
7e1a481 [Doris Xin] changed the permission on SamplingUtil
1d413ce [Doris Xin] fixed checkstyle issues
9ee94ee [Doris Xin] [SPARK-2082] stratified sampling in PairRDDFunctions that guarantees exact sample size
e3fd6a6 [Doris Xin] Merge branch 'master' into takeSample
7cab53a [Doris Xin] fixed import bug in rdd.py
ffea61a [Doris Xin] SPARK-1939: Refactor takeSample method in RDD
1441977 [Doris Xin] SPARK-1939 Refactor takeSample method in RDD to use ScaSRS
  • Loading branch information
dorx authored and mengxr committed Jul 29, 2014
1 parent f0d880e commit dc96536
Show file tree
Hide file tree
Showing 7 changed files with 656 additions and 16 deletions.
69 changes: 68 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList}
import java.util.{Comparator, List => JList, Map => JMap}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -129,6 +129,73 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean,
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, exact, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*
* Use Utils.random.nextLong as the default seed for the random number generator
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, exact, Utils.random.nextLong)

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, false, seed)

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
*
* Use Utils.random.nextLong as the default seed for the random number generator
*/
def sampleByKey(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, false, Utils.random.nextLong)

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
Expand Down
54 changes: 45 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ package org.apache.spark.rdd

import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util.Date
import java.util.{HashMap => JHashMap}
import java.util.{Date, HashMap => JHashMap}

import scala.collection.{Map, mutable}
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

Expand All @@ -34,19 +32,19 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob,
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}

import org.apache.spark._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.SparkHadoopWriter
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.random.StratifiedSamplingUtils

/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
Expand Down Expand Up @@ -195,6 +193,41 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
foldByKey(zeroValue, defaultPartitioner(self))(func)
}

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use
* additional passes over the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with a 99.99% confidence. When sampling
* without replacement, we need one additional pass over the RDD to guarantee sample size;
* when sampling with replacement, we need two additional passes.
*
* @param withReplacement whether to sample with or without replacement
* @param fractions map of specific keys to sampling rates
* @param seed seed for the random number generator
* @param exact whether sample size needs to be exactly math.ceil(fraction * size) per key
* @return RDD containing the sampled subset
*/
def sampleByKey(withReplacement: Boolean,
fractions: Map[K, Double],
exact: Boolean = false,
seed: Long = Utils.random.nextLong): RDD[(K, V)]= {

require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")

val samplingFunc = if (withReplacement) {
StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, exact, seed)
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, exact, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
}

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
Expand Down Expand Up @@ -531,6 +564,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

/**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*/
def collectAsMap(): Map[K, V] = {
val data = self.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,83 @@ private[spark] object SamplingUtils {
* ~ Binomial(total, fraction) and our choice of q guarantees 1-delta, or 0.9999 success
* rate, where success rate is defined the same as in sampling with replacement.
*
* The smallest sampling rate supported is 1e-10 (in order to avoid running into the limit of the
* RNG's resolution).
*
* @param sampleSizeLowerBound sample size
* @param total size of RDD
* @param withReplacement whether sampling with replacement
* @return a sampling rate that guarantees sufficient sample size with 99.99% success rate
*/
def computeFractionForSampleSize(sampleSizeLowerBound: Int, total: Long,
withReplacement: Boolean): Double = {
val fraction = sampleSizeLowerBound.toDouble / total
if (withReplacement) {
val numStDev = if (sampleSizeLowerBound < 12) 9 else 5
fraction + numStDev * math.sqrt(fraction / total)
PoissonBounds.getUpperBound(sampleSizeLowerBound) / total
} else {
val delta = 1e-4
val gamma = - math.log(delta) / total
math.min(1, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction))
val fraction = sampleSizeLowerBound.toDouble / total
BinomialBounds.getUpperBound(1e-4, total, fraction)
}
}
}

/**
* Utility functions that help us determine bounds on adjusted sampling rate to guarantee exact
* sample sizes with high confidence when sampling with replacement.
*/
private[spark] object PoissonBounds {

/**
* Returns a lambda such that Pr[X > s] is very small, where X ~ Pois(lambda).
*/
def getLowerBound(s: Double): Double = {
math.max(s - numStd(s) * math.sqrt(s), 1e-15)
}

/**
* Returns a lambda such that Pr[X < s] is very small, where X ~ Pois(lambda).
*
* @param s sample size
*/
def getUpperBound(s: Double): Double = {
math.max(s + numStd(s) * math.sqrt(s), 1e-10)
}

private def numStd(s: Double): Double = {
// TODO: Make it tighter.
if (s < 6.0) {
12.0
} else if (s < 16.0) {
9.0
} else {
6.0
}
}
}

/**
* Utility functions that help us determine bounds on adjusted sampling rate to guarantee exact
* sample size with high confidence when sampling without replacement.
*/
private[spark] object BinomialBounds {

val minSamplingRate = 1e-10

/**
* Returns a threshold `p` such that if we conduct n Bernoulli trials with success rate = `p`,
* it is very unlikely to have more than `fraction * n` successes.
*/
def getLowerBound(delta: Double, n: Long, fraction: Double): Double = {
val gamma = - math.log(delta) / n * (2.0 / 3.0)
fraction + gamma - math.sqrt(gamma * gamma + 3 * gamma * fraction)
}

/**
* Returns a threshold `p` such that if we conduct n Bernoulli trials with success rate = `p`,
* it is very unlikely to have less than `fraction * n` successes.
*/
def getUpperBound(delta: Double, n: Long, fraction: Double): Double = {
val gamma = - math.log(delta) / n
math.min(1,
math.max(minSamplingRate, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction)))
}
}
Loading

0 comments on commit dc96536

Please sign in to comment.