Skip to content

Commit

Permalink
SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus instead…
Browse files Browse the repository at this point in the history
… of HyperLogLog.

I also corrected some errors made in the previous HLL count approximate API, including relativeSD wasn't really a measure for error (and we used it to test error bounds in test results).

Author: Reynold Xin <[email protected]>

Closes #897 from rxin/hll and squashes the following commits:

4d83f41 [Reynold Xin] New error bound and non-randomness.
f154ea0 [Reynold Xin] Added a comment on the value bound for testing.
e367527 [Reynold Xin] One more round of code review.
41e649a [Reynold Xin] Update final mima list.
9e320c8 [Reynold Xin] Incorporate code review feedback.
e110d70 [Reynold Xin] Merge branch 'master' into hll
354deb8 [Reynold Xin] Added comment on the Mima exclude rules.
acaa524 [Reynold Xin] Added the right exclude rules in MimaExcludes.
6555bfe [Reynold Xin] Added a default method and re-arranged MimaExcludes.
1db1522 [Reynold Xin] Excluded util.SerializableHyperLogLog from MIMA check.
9221b27 [Reynold Xin] Merge branch 'master' into hll
88cfe77 [Reynold Xin] Updated documentation and restored the old incorrect API to maintain API compatibility.
1294be6 [Reynold Xin] Updated HLL+.
e7786cb [Reynold Xin] Merge branch 'master' into hll
c0ef0c2 [Reynold Xin] SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus instead of HyperLogLog.
  • Loading branch information
rxin authored and mengxr committed Jun 4, 2014
1 parent 21e40ed commit 1faef14
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 135 deletions.
51 changes: 30 additions & 21 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
* Partitioner to partition the output RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
* @param partitioner partitioner of the resulting RDD.
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, partitioner)
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
{
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
}

/**
* Return approximate number of distinct values for each key this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
* level.
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
* @param numPartitions number of partitions of the resulting RDD.
*/
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD)
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
}


/**
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
* output RDD into numPartitions.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD))
}

/** Assign a name to this RDD */
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return approximate number of distinct elements in the RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)

def name(): String = rdd.name

Expand Down
90 changes: 69 additions & 21 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
Expand All @@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.SerializableHyperLogLog

/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
Expand Down Expand Up @@ -214,39 +213,88 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

/**
* :: Experimental ::
*
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vice versa. Uses the provided
* Partitioner to partition the output RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
* would trigger sparse representation of registers, which may reduce the memory consumption
* and increase accuracy when the cardinality is small.
*
* @param p The precision value for the normal set.
* `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If `sp` equals 0, the sparse representation is skipped.
* @param partitioner Partitioner to use for the resulting RDD.
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
@Experimental
def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = {
require(p >= 4, s"p ($p) must be >= 4")
require(sp <= 32, s"sp ($sp) must be <= 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
val createHLL = (v: V) => {
val hll = new HyperLogLogPlus(p, sp)
hll.offer(v)
hll
}
val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
hll.offer(v)
hll
}
val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
h1.addAll(h2)
h1
}

combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
}

combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
/**
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
* @param partitioner partitioner of the resulting RDD
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
assert(p <= 32)
countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner)
}

/**
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vice versa. HashPartitions the
* output RDD into numPartitions.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
* @param numPartitions number of partitions of the resulting RDD
*/
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
}

/**
* Return approximate number of distinct values for each key this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vice versa. The default value of
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
* level.
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
Expand Down
53 changes: 43 additions & 10 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package org.apache.spark.rdd

import java.util.Random

import scala.collection.Map
import scala.collection.mutable
import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
Expand All @@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}

Expand Down Expand Up @@ -921,15 +920,49 @@ abstract class RDD[T: ClassTag](
* :: Experimental ::
* Return approximate number of distinct elements in the RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
* would trigger sparse representation of registers, which may reduce the memory consumption
* and increase accuracy when the cardinality is small.
*
* @param p The precision value for the normal set.
* `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If `sp` equals 0, the sparse representation is skipped.
*/
@Experimental
def countApproxDistinct(p: Int, sp: Int): Long = {
require(p >= 4, s"p ($p) must be greater than 0")
require(sp <= 32, s"sp ($sp) cannot be greater than 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
val zeroCounter = new HyperLogLogPlus(p, sp)
aggregate(zeroCounter)(
(hll: HyperLogLogPlus, v: T) => {
hll.offer(v)
hll
},
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
h1.addAll(h2)
h2
}).cardinality()
}

/**
* Return approximate number of distinct elements in the RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
countApproxDistinct(p, 0)
}

/**
Expand Down

This file was deleted.

10 changes: 3 additions & 7 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1028,27 +1028,23 @@ public void countApproxDistinct() {
arrayData.add(i % size);
}
JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1);
}

@Test
public void countApproxDistinctByKey() {
double relativeSD = 0.001;

List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
for (int i = 10; i < 100; i++)
for (int j = 0; j < i; j++)
arrayData.add(new Tuple2<Integer, Integer>(i, j));

JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect();
List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(8, 0).collect();
for (Tuple2<Integer, Object> resItem : res) {
double count = (double)resItem._1();
Long resCount = (Long)resItem._2();
Double error = Math.abs((resCount - count) / count);
Assert.assertTrue(error < relativeSD);
Assert.assertTrue(error < 0.1);
}

}
Expand Down
Loading

0 comments on commit 1faef14

Please sign in to comment.