-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus instead of HyperLogLog. #897
Changes from 3 commits
c0ef0c2
e7786cb
1294be6
88cfe77
9221b27
1db1522
6555bfe
acaa524
354deb8
e110d70
9e320c8
41e649a
e367527
f154ea0
4d83f41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -672,38 +672,102 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) | |
|
||
/** | ||
* Return approximate number of distinct values for each key in this RDD. | ||
* | ||
* The accuracy of approximation can be controlled through the relative standard deviation | ||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in | ||
* more accurate counts but increase the memory footprint and vise versa. Uses the provided | ||
* Partitioner to partition the output RDD. | ||
* | ||
* @param p The precision value for the normal set. | ||
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max). | ||
* @param sp The precision value for the sparse set, between 0 and 32. | ||
* If <code>sp</code> equals 0, the sparse representation is skipped. | ||
* @param partitioner Partitioner to use for the resulting RDD. | ||
*/ | ||
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { | ||
rdd.countApproxDistinctByKey(relativeSD, partitioner) | ||
def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): JavaPairRDD[K, Long] = { | ||
fromRDD(rdd.countApproxDistinctByKey(p, sp, partitioner)) | ||
} | ||
|
||
/** | ||
* Return approximate number of distinct values for each key this RDD. | ||
* Return approximate number of distinct values for each key in this RDD. | ||
* | ||
* The accuracy of approximation can be controlled through the relative standard deviation | ||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in | ||
* more accurate counts but increase the memory footprint and vise versa. The default value of | ||
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism | ||
* level. | ||
* more accurate counts but increase the memory footprint and vise versa. Uses the provided | ||
* Partitioner to partition the output RDD. | ||
* | ||
* @param p The precision value for the normal set. | ||
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max). | ||
* @param sp The precision value for the sparse set, between 0 and 32. | ||
* If <code>sp</code> equals 0, the sparse representation is skipped. | ||
* @param numPartitions The number of partitions in the resulting RDD. | ||
*/ | ||
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = { | ||
rdd.countApproxDistinctByKey(relativeSD) | ||
def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): JavaPairRDD[K, Long] = { | ||
fromRDD(rdd.countApproxDistinctByKey(p, sp, numPartitions)) | ||
} | ||
|
||
|
||
/** | ||
* Return approximate number of distinct values for each key in this RDD. | ||
* | ||
* The accuracy of approximation can be controlled through the relative standard deviation | ||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in | ||
* more accurate counts but increase the memory footprint and vise versa. Uses the provided | ||
* Partitioner to partition the output RDD. | ||
* | ||
* @param p The precision value for the normal set. | ||
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max). | ||
* @param sp The precision value for the sparse set, between 0 and 32. | ||
* If <code>sp</code> equals 0, the sparse representation is skipped. | ||
*/ | ||
def countApproxDistinctByKey(p: Int, sp: Int): JavaPairRDD[K, Long] = { | ||
fromRDD(rdd.countApproxDistinctByKey(p, sp)) | ||
} | ||
|
||
/** | ||
* Return approximate number of distinct values for each key in this RDD. This is deprecated. | ||
* Use the variant with <code>p</code> and <code>sp</code> parameters instead. | ||
* | ||
* The accuracy of approximation can be controlled through the relative standard deviation | ||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in | ||
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the | ||
* output RDD into numPartitions. | ||
* more accurate counts but increase the memory footprint and vise versa. Uses the provided | ||
* Partitioner to partition the output RDD. | ||
*/ | ||
@Deprecated | ||
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = | ||
{ | ||
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)) | ||
} | ||
|
||
/** | ||
* Return approximate number of distinct values for each key in this RDD. This is deprecated. | ||
* Use the variant with <code>p</code> and <code>sp</code> parameters instead. | ||
* | ||
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at | ||
* [[http://research.google.com/pubs/pub40671.html]]. | ||
* | ||
* @param relativeSD The relative standard deviation for the counter. | ||
* Smaller values create counters that require more space. | ||
*/ | ||
@Deprecated | ||
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { | ||
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)) | ||
} | ||
|
||
/** | ||
* Return approximate number of distinct values for each key in this RDD. This is deprecated. | ||
* Use the variant with <code>p</code> and <code>sp</code> parameters instead. | ||
* | ||
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at | ||
* [[http://research.google.com/pubs/pub40671.html]]. | ||
* | ||
* @param relativeSD The relative standard deviation for the counter. | ||
* Smaller values create counters that require more space. | ||
*/ | ||
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { | ||
rdd.countApproxDistinctByKey(relativeSD, numPartitions) | ||
@Deprecated | ||
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that I changed the return type from JavaRDD[(K, Long)] to JavaPairRDD[K, Long], because that is what it should've been. However, in order to maintain complete API stability, I can change it back and just deprecated the old methods. The new methods certainly should return JavaPairRDD. |
||
fromRDD(rdd.countApproxDistinctByKey(relativeSD)) | ||
} | ||
|
||
/** Assign a name to this RDD */ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ import scala.collection.mutable | |
import scala.collection.mutable.ArrayBuffer | ||
import scala.reflect.ClassTag | ||
|
||
import com.clearspring.analytics.stream.cardinality.HyperLogLog | ||
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus | ||
import org.apache.hadoop.conf.{Configurable, Configuration} | ||
import org.apache.hadoop.fs.FileSystem | ||
import org.apache.hadoop.io.SequenceFile.CompressionType | ||
|
@@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner | |
import org.apache.spark.SparkContext._ | ||
import org.apache.spark.partial.{BoundedDouble, PartialResult} | ||
import org.apache.spark.serializer.Serializer | ||
import org.apache.spark.util.SerializableHyperLogLog | ||
|
||
/** | ||
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion. | ||
|
@@ -214,18 +213,94 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) | |
} | ||
|
||
/** | ||
* :: Experimental :: | ||
* | ||
* Return approximate number of distinct values for each key in this RDD. | ||
* The accuracy of approximation can be controlled through the relative standard deviation | ||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in | ||
* more accurate counts but increase the memory footprint and vice versa. Uses the provided | ||
* Partitioner to partition the output RDD. | ||
* | ||
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at | ||
* [[http://research.google.com/pubs/pub40671.html]]. | ||
* | ||
* @param p The precision value for the normal set. | ||
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max). | ||
* @param sp The precision value for the sparse set, between 0 and 32. | ||
* If <code>sp</code> equals 0, the sparse representation is skipped. | ||
* @param partitioner Partitioner to use for the resulting RDD. | ||
*/ | ||
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { | ||
val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v) | ||
val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v) | ||
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) | ||
@Experimental | ||
def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { | ||
val createHLL = (v: V) => { | ||
val hll = new HyperLogLogPlus(p, sp) | ||
hll.offer(v) | ||
hll | ||
} | ||
val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => { | ||
hll.offer(v) | ||
hll | ||
} | ||
val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { | ||
h1.addAll(h2) | ||
h1 | ||
} | ||
|
||
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality()) | ||
} | ||
|
||
/** | ||
* :: Experimental :: | ||
* | ||
* Return approximate number of distinct values for each key in this RDD. | ||
* | ||
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at | ||
* [[http://research.google.com/pubs/pub40671.html]]. | ||
* | ||
* @param p The precision value for the normal set. | ||
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max). | ||
* @param sp The precision value for the sparse set, between 0 and 32. | ||
* If <code>sp</code> equals 0, the sparse representation is skipped. | ||
* @param numPartitions Number of partitions in the resulting RDD. | ||
*/ | ||
@Experimental | ||
def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): RDD[(K, Long)] = { | ||
countApproxDistinctByKey(p, sp, new HashPartitioner(numPartitions)) | ||
} | ||
|
||
/** | ||
* :: Experimental :: | ||
* | ||
* Return approximate number of distinct values for each key in this RDD. | ||
* | ||
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at | ||
* [[http://research.google.com/pubs/pub40671.html]]. | ||
* | ||
* @param p The precision value for the normal set. | ||
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max). | ||
* @param sp The precision value for the sparse set, between 0 and 32. | ||
* If <code>sp</code> equals 0, the sparse representation is skipped. | ||
*/ | ||
@Experimental | ||
def countApproxDistinctByKey(p: Int, sp: Int): RDD[(K, Long)] = { | ||
countApproxDistinctByKey(p, sp, defaultPartitioner(self)) | ||
} | ||
|
||
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) | ||
/** | ||
* Return approximate number of distinct values for each key in this RDD. | ||
* | ||
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at | ||
* [[http://research.google.com/pubs/pub40671.html]]. | ||
* | ||
* @param relativeSD The relative standard deviation for the counter. | ||
* Smaller values create counters that require more space. | ||
* @param partitioner Partitioner to use for the resulting RDD | ||
*/ | ||
@deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we put the mapping I mentioned below to the deprecation message? |
||
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { | ||
// See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p. | ||
val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked the paper. I think the correct mapping should be
|
||
countApproxDistinctByKey(p, 0, partitioner) | ||
} | ||
|
||
/** | ||
|
@@ -235,7 +310,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) | |
* more accurate counts but increase the memory footprint and vice versa. HashPartitions the | ||
* output RDD into numPartitions. | ||
* | ||
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at | ||
* [[http://research.google.com/pubs/pub40671.html]]. | ||
*/ | ||
@deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") | ||
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { | ||
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) | ||
} | ||
|
@@ -247,7 +326,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) | |
* more accurate counts but increase the memory footprint and vice versa. The default value of | ||
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism | ||
* level. | ||
* | ||
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at | ||
* [[http://research.google.com/pubs/pub40671.html]]. | ||
*/ | ||
@deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") | ||
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { | ||
countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ import scala.collection.mutable | |
import scala.collection.mutable.ArrayBuffer | ||
import scala.reflect.{classTag, ClassTag} | ||
|
||
import com.clearspring.analytics.stream.cardinality.HyperLogLog | ||
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus | ||
import org.apache.hadoop.io.BytesWritable | ||
import org.apache.hadoop.io.compress.CompressionCodec | ||
import org.apache.hadoop.io.NullWritable | ||
|
@@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator | |
import org.apache.spark.partial.GroupedCountEvaluator | ||
import org.apache.spark.partial.PartialResult | ||
import org.apache.spark.storage.StorageLevel | ||
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils} | ||
import org.apache.spark.util.{BoundedPriorityQueue, Utils} | ||
import org.apache.spark.util.collection.OpenHashMap | ||
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler} | ||
|
||
|
@@ -921,15 +921,44 @@ abstract class RDD[T: ClassTag]( | |
* :: Experimental :: | ||
* Return approximate number of distinct elements in the RDD. | ||
* | ||
* The accuracy of approximation can be controlled through the relative standard deviation | ||
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in | ||
* more accurate counts but increase the memory footprint and vise versa. The default value of | ||
* relativeSD is 0.05. | ||
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at | ||
* [[http://research.google.com/pubs/pub40671.html]]. | ||
* | ||
* @param p The precision value for the normal set. | ||
* <code>p</code> must be a value between 4 and <code>sp</code>. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us put the following into the doc. The relative standard error is approximately |
||
* @param sp The precision value for the sparse set, between 0 and 32. | ||
* If <code>sp</code> equals 0, the sparse representation is skipped. | ||
*/ | ||
@Experimental | ||
def countApproxDistinct(p: Int, sp: Int): Long = { | ||
require(p >= 4, s"p ($p) must be greater than 0") | ||
require(sp <= 32, s"sp ($sp) cannot be greater than 32") | ||
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") | ||
val zeroCounter = new HyperLogLogPlus(p, sp) | ||
aggregate(zeroCounter)( | ||
(hll: HyperLogLogPlus, v: T) => { | ||
hll.offer(v) | ||
hll | ||
}, | ||
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { | ||
h1.addAll(h2) | ||
h2 | ||
}).cardinality() | ||
} | ||
|
||
/** | ||
* Return approximate number of distinct elements in the RDD. | ||
* | ||
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at | ||
* [[http://research.google.com/pubs/pub40671.html]]. | ||
*/ | ||
@deprecated("Use countApproxDistinct with parameter p and sp", "1.0.1") | ||
def countApproxDistinct(relativeSD: Double = 0.05): Long = { | ||
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) | ||
aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality() | ||
// See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p. | ||
val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See the formula I mentioned above. |
||
countApproxDistinct(p, 0) | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to provide some migration tips. Is there a mapping from
relativeSD
to precision numbers? Actually,relativeSD
is much easier for users to understand.