Skip to content

Commit

Permalink
Updated HLL+.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed May 31, 2014
1 parent e7786cb commit 1294be6
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 43 deletions.
90 changes: 77 additions & 13 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -672,38 +672,102 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Return approximate number of distinct values for each key in this RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
* Partitioner to partition the output RDD.
*
* @param p The precision value for the normal set.
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If <code>sp</code> equals 0, the sparse representation is skipped.
* @param partitioner Partitioner to use for the resulting RDD.
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, partitioner)
def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(p, sp, partitioner))
}

/**
* Return approximate number of distinct values for each key this RDD.
* Return approximate number of distinct values for each key in this RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
* level.
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
* Partitioner to partition the output RDD.
*
* @param p The precision value for the normal set.
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If <code>sp</code> equals 0, the sparse representation is skipped.
* @param numPartitions The number of partitions in the resulting RDD.
*/
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD)
def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(p, sp, numPartitions))
}


/**
* Return approximate number of distinct values for each key in this RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
* Partitioner to partition the output RDD.
*
* @param p The precision value for the normal set.
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If <code>sp</code> equals 0, the sparse representation is skipped.
*/
def countApproxDistinctByKey(p: Int, sp: Int): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(p, sp))
}

/**
* Return approximate number of distinct values for each key in this RDD. This is deprecated.
* Use the variant with <code>p</code> and <code>sp</code> parameters instead.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
* output RDD into numPartitions.
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
* Partitioner to partition the output RDD.
*/
@Deprecated
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
{
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
}

/**
* Return approximate number of distinct values for each key in this RDD. This is deprecated.
* Use the variant with <code>p</code> and <code>sp</code> parameters instead.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*
* @param relativeSD The relative standard deviation for the counter.
* Smaller values create counters that require more space.
*/
@Deprecated
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
}

/**
* Return approximate number of distinct values for each key in this RDD. This is deprecated.
* Use the variant with <code>p</code> and <code>sp</code> parameters instead.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*
* @param relativeSD The relative standard deviation for the counter.
* Smaller values create counters that require more space.
*/
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
@Deprecated
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD))
}

/** Assign a name to this RDD */
Expand Down
27 changes: 26 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*
* @param p The precision value for the normal set.
* <code>p</code> must be a value between 4 and <code>sp</code>.
* @param sp The precision value for the sparse set, between 0 and 32.
* If <code>sp</code> equals 0, the sparse representation is skipped.
*/
def countApproxDistinct(p: Int, sp: Int): Long = rdd.countApproxDistinct(p, sp)

/**
* Return approximate number of distinct elements in the RDD. This is deprecated. Use the
* variant with <code>p</code> and <code>sp</code> parameters instead.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*/
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
@Deprecated
def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)

def name(): String = rdd.name

Expand Down
84 changes: 77 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,20 +213,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

/**
* :: Experimental ::
*
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vice versa. Uses the provided
* [[Partitioner]] to partition the output RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*
* @param p The precision value for the normal set.
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If <code>sp</code> equals 0, the sparse representation is skipped.
* @param partitioner Partitioner to use for the resulting RDD.
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
@Experimental
def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = {
val createHLL = (v: V) => {
val hll = new HyperLogLogPlus(precision)
val hll = new HyperLogLogPlus(p, sp)
hll.offer(v)
hll
}
Expand All @@ -242,14 +246,75 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
}

/**
* :: Experimental ::
*
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*
* @param p The precision value for the normal set.
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If <code>sp</code> equals 0, the sparse representation is skipped.
* @param numPartitions Number of partitions in the resulting RDD.
*/
@Experimental
def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(p, sp, new HashPartitioner(numPartitions))
}

/**
* :: Experimental ::
*
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*
* @param p The precision value for the normal set.
* <code>p</code> must be a value between 4 and <code>sp</code> (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If <code>sp</code> equals 0, the sparse representation is skipped.
*/
@Experimental
def countApproxDistinctByKey(p: Int, sp: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(p, sp, defaultPartitioner(self))
}

/**
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*
* @param relativeSD The relative standard deviation for the counter.
* Smaller values create counters that require more space.
* @param partitioner Partitioner to use for the resulting RDD
*/
@deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1")
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
// See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p.
val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
countApproxDistinctByKey(p, 0, partitioner)
}

/**
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vice versa. HashPartitions the
* output RDD into numPartitions.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*/
@deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1")
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
}
Expand All @@ -261,7 +326,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* more accurate counts but increase the memory footprint and vice versa. The default value of
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
* level.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*/
@deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1")
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
}
Expand Down
32 changes: 24 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -921,19 +921,21 @@ abstract class RDD[T: ClassTag](
* :: Experimental ::
* Return approximate number of distinct elements in the RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*
* @param p The precision value for the normal set.
* <code>p</code> must be a value between 4 and <code>sp</code>.
* @param sp The precision value for the sparse set, between 0 and 32.
* If <code>sp</code> equals 0, the sparse representation is skipped.
*/
@Experimental
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
val zeroCounter = new HyperLogLogPlus(precision)
def countApproxDistinct(p: Int, sp: Int): Long = {
require(p >= 4, s"p ($p) must be greater than 0")
require(sp <= 32, s"sp ($sp) cannot be greater than 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
val zeroCounter = new HyperLogLogPlus(p, sp)
aggregate(zeroCounter)(
(hll: HyperLogLogPlus, v: T) => {
hll.offer(v)
Expand All @@ -945,6 +947,20 @@ abstract class RDD[T: ClassTag](
}).cardinality()
}

/**
* Return approximate number of distinct elements in the RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*/
@deprecated("Use countApproxDistinct with parameter p and sp", "1.0.1")
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
// See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p.
val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
countApproxDistinct(p, 0)
}

/**
* Zips this RDD with its element indices. The ordering is first based on the partition index
* and then the ordering of items within each partition. So the first item in the first
Expand Down
10 changes: 3 additions & 7 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1031,27 +1031,23 @@ public void countApproxDistinct() {
arrayData.add(i % size);
}
JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(8, 0) - size) / (size * 1.0)) <= 0.1);
}

@Test
public void countApproxDistinctByKey() {
double relativeSD = 0.001;

List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
for (int i = 10; i < 100; i++)
for (int j = 0; j < i; j++)
arrayData.add(new Tuple2<Integer, Integer>(i, j));

JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect();
List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(8, 0).collect();
for (Tuple2<Integer, Object> resItem : res) {
double count = (double)resItem._1();
Long resCount = (Long)resItem._2();
Double error = Math.abs((resCount - count) / count);
Assert.assertTrue(error < relativeSD);
Assert.assertTrue(error < 0.1);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,15 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
* relatively tight error bounds to check correctness of functionality rather than checking
* whether the approximation conforms with the requested bound.
*/
val relativeSD = 0.001
val p = 20
val sp = 0
val relativeSD = 0.01

// For each value i, there are i tuples with first element equal to i.
// Therefore, the expected count for key i would be i.
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
val rdd1 = sc.parallelize(stacked)
val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
val counted1 = rdd1.countApproxDistinctByKey(p, sp).collect()
counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) }

val rnd = new Random()
Expand All @@ -136,7 +138,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
(1 to num).map(j => (num, j))
}
val rdd2 = sc.parallelize(randStacked)
val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
val counted2 = rdd2.countApproxDistinctByKey(p, sp, 4).collect()
counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) }
}

Expand Down
6 changes: 2 additions & 4 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val size = 100
val uniformDistro = for (i <- 1 to 100000) yield i % size
val simpleRdd = sc.makeRDD(uniformDistro)
assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
assert(error(simpleRdd.countApproxDistinct(4, 0), size) < 0.4)
assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.1)
}

test("SparkContext.union") {
Expand Down

0 comments on commit 1294be6

Please sign in to comment.