Skip to content

Commit

Permalink
Reviewer comments
Browse files Browse the repository at this point in the history
Added BernoulliBounds
  • Loading branch information
dorx committed Jul 15, 2014
1 parent a10e68d commit f4c21f3
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 107 deletions.
41 changes: 38 additions & 3 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,35 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean,
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, exact, seed))


/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*
* Use Utils.random.nextLong as the default seed for the random number generator
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
Expand All @@ -149,17 +168,33 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, true, seed)
sampleByKey(withReplacement, fractions, false, seed)

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
*
* Use Utils.random.nextLong as the default seed for the random number generator
*/
def sampleByKey(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, true, Utils.random.nextLong)
sampleByKey(withReplacement, fractions, false, Utils.random.nextLong)

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
Expand Down
30 changes: 17 additions & 13 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
import org.apache.spark.util.random.StratifiedSampler
import org.apache.spark.util.random.StratifiedSamplingUtils

/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
Expand Down Expand Up @@ -195,32 +195,36 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* If exact set to true, we guarantee, with high probability, a sample size =
* math.ceil(fraction * S_i), where S_i is the size of the ith stratum (collection of entries
* that share the same key). When sampling without replacement, we need one additional pass over
* the RDD to guarantee sample size with a 99.99% confidence; when sampling with replacement, we
* need two additional passes.
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use
* additional passes over the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with a 99.99% confidence. When sampling
* without replacement, we need one additional pass over the RDD to guarantee sample size;
* when sampling with replacement, we need two additional passes.
*
* @param withReplacement whether to sample with or without replacement
* @param fractions map of specific keys to sampling rates
* @param seed seed for the random number generator
* @param exact whether sample size needs to be exactly math.ceil(fraction * size) per stratum
* @param exact whether sample size needs to be exactly math.ceil(fraction * size) per key
* @return RDD containing the sampled subset
*/
def sampleByKey(withReplacement: Boolean,
fractions: Map[K, Double],
exact: Boolean = true,
exact: Boolean = false,
seed: Long = Utils.random.nextLong): RDD[(K, V)]= {

require(fractions.forall {case(k, v) => v >= 0.0}, "Invalid sampling rates.")
require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")

val samplingFunc = if (withReplacement) {
val counts = if (exact) Some(this.countByKey()) else None
StratifiedSampler.getPoissonSamplingFunction(self, fractions, exact, counts, seed)
StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, exact, seed)
} else {
StratifiedSampler.getBernoulliSamplingFunction(self, fractions, exact, seed)
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, exact, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning=true)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ private[spark] object SamplingUtils {
val numStDev = if (sampleSizeLowerBound < 12) 9 else 5
math.max(1e-10, fraction + numStDev * math.sqrt(fraction / total))
} else {
val delta = 1e-4
val gamma = - math.log(delta) / total
math.min(1,
math.max(1e-10, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction)))
BernoulliBounds.getLowerBound(1e-4, total, fraction)
}
}
}
Expand Down Expand Up @@ -125,3 +122,21 @@ private[spark] object PoissonBounds {
ub
}
}


private[spark] object BernoulliBounds {

val minSamplingRate = 1e-10

def getUpperBound(delta: Double, n: Long, fraction: Double): Double = {
val gamma = - math.log(delta) / n * (2.0 / 3.0)
math.max(minSamplingRate,
fraction + gamma - math.sqrt(gamma * gamma + 3 * gamma * fraction))
}

def getLowerBound(delta: Double, n: Long, fraction: Double): Double = {
val gamma = - math.log(delta) / n
math.min(1,
math.max(minSamplingRate, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction)))
}
}
Loading

0 comments on commit f4c21f3

Please sign in to comment.