diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 4c8f9ed6fbc02..b1058c3c35f5f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -672,38 +672,102 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. + * * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. Uses the provided * Partitioner to partition the output RDD. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + * @param partitioner Partitioner to use for the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, partitioner) + def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(p, sp, partitioner)) } /** - * Return approximate number of distinct values for each key this RDD. + * Return approximate number of distinct values for each key in this RDD. + * * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism - * level. + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + * @param numPartitions The number of partitions in the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD) + def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(p, sp, numPartitions)) } - /** * Return approximate number of distinct values for each key in this RDD. + * + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + */ + def countApproxDistinctByKey(p: Int, sp: Int): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(p, sp)) + } + + /** + * Return approximate number of distinct values for each key in this RDD. This is deprecated. + * Use the variant with p and sp parameters instead. + * * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. HashPartitions the - * output RDD into numPartitions. + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + */ + @Deprecated + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = + { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)) + } + + /** + * Return approximate number of distinct values for each key in this RDD. This is deprecated. + * Use the variant with p and sp parameters instead. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param relativeSD The relative standard deviation for the counter. + * Smaller values create counters that require more space. + */ + @Deprecated + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)) + } + + /** + * Return approximate number of distinct values for each key in this RDD. This is deprecated. + * Use the variant with p and sp parameters instead. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. * + * @param relativeSD The relative standard deviation for the counter. + * Smaller values create counters that require more space. */ - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, numPartitions) + @Deprecated + def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD)) } /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 619bfd75be8eb..080bb6ea677a4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -564,8 +564,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. The default value of * relativeSD is 0.05. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp. + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + */ + def countApproxDistinct(p: Int, sp: Int): Long = rdd.countApproxDistinct(p, sp) + + /** + * Return approximate number of distinct elements in the RDD. This is deprecated. Use the + * variant with p and sp parameters instead. + * + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. */ - def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) + @Deprecated + def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD) def name(): String = rdd.name diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 0f3eedb0a8aac..1d95d5bdd45b4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -213,20 +213,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** + * :: Experimental :: + * * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vice versa. Uses the provided - * [[Partitioner]] to partition the output RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + * @param partitioner Partitioner to use for the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { - val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt + @Experimental + def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { val createHLL = (v: V) => { - val hll = new HyperLogLogPlus(precision) + val hll = new HyperLogLogPlus(p, sp) hll.offer(v) hll } @@ -242,6 +246,63 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality()) } + /** + * :: Experimental :: + * + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + * @param numPartitions Number of partitions in the resulting RDD. + */ + @Experimental + def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): RDD[(K, Long)] = { + countApproxDistinctByKey(p, sp, new HashPartitioner(numPartitions)) + } + + /** + * :: Experimental :: + * + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + */ + @Experimental + def countApproxDistinctByKey(p: Int, sp: Int): RDD[(K, Long)] = { + countApproxDistinctByKey(p, sp, defaultPartitioner(self)) + } + + /** + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param relativeSD The relative standard deviation for the counter. + * Smaller values create counters that require more space. + * @param partitioner Partitioner to use for the resulting RDD + */ + @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + // See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p. + val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt + countApproxDistinctByKey(p, 0, partitioner) + } + /** * Return approximate number of distinct values for each key in this RDD. * The accuracy of approximation can be controlled through the relative standard deviation @@ -249,7 +310,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * more accurate counts but increase the memory footprint and vice versa. HashPartitions the * output RDD into numPartitions. * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. */ + @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } @@ -261,7 +326,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * more accurate counts but increase the memory footprint and vice versa. The default value of * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism * level. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. */ + @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 94307aad14acd..f418c80db08d4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -921,19 +921,21 @@ abstract class RDD[T: ClassTag]( * :: Experimental :: * Return approximate number of distinct elements in the RDD. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. - * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp. + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. */ @Experimental - def countApproxDistinct(relativeSD: Double = 0.05): Long = { - val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt - val zeroCounter = new HyperLogLogPlus(precision) + def countApproxDistinct(p: Int, sp: Int): Long = { + require(p >= 4, s"p ($p) must be greater than 0") + require(sp <= 32, s"sp ($sp) cannot be greater than 32") + require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") + val zeroCounter = new HyperLogLogPlus(p, sp) aggregate(zeroCounter)( (hll: HyperLogLogPlus, v: T) => { hll.offer(v) @@ -945,6 +947,20 @@ abstract class RDD[T: ClassTag]( }).cardinality() } + /** + * Return approximate number of distinct elements in the RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + */ + @deprecated("Use countApproxDistinct with parameter p and sp", "1.0.1") + def countApproxDistinct(relativeSD: Double = 0.05): Long = { + // See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p. + val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt + countApproxDistinct(p, 0) + } + /** * Zips this RDD with its element indices. The ordering is first based on the partition index * and then the ordering of items within each partition. So the first item in the first diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 3dd79243ab5bd..21ba8ed5eb3de 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1031,27 +1031,23 @@ public void countApproxDistinct() { arrayData.add(i % size); } JavaRDD simpleRdd = sc.parallelize(arrayData, 10); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(8, 0) - size) / (size * 1.0)) <= 0.1); } @Test public void countApproxDistinctByKey() { - double relativeSD = 0.001; - List> arrayData = new ArrayList>(); for (int i = 10; i < 100; i++) for (int j = 0; j < i; j++) arrayData.add(new Tuple2(i, j)); JavaPairRDD pairRdd = sc.parallelizePairs(arrayData); - List> res = pairRdd.countApproxDistinctByKey(relativeSD).collect(); + List> res = pairRdd.countApproxDistinctByKey(8, 0).collect(); for (Tuple2 resItem : res) { double count = (double)resItem._1(); Long resCount = (Long)resItem._2(); Double error = Math.abs((resCount - count) / count); - Assert.assertTrue(error < relativeSD); + Assert.assertTrue(error < 0.1); } } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 010ee04109225..937ddfcc2b6b1 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -119,13 +119,15 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { * relatively tight error bounds to check correctness of functionality rather than checking * whether the approximation conforms with the requested bound. */ - val relativeSD = 0.001 + val p = 20 + val sp = 0 + val relativeSD = 0.01 // For each value i, there are i tuples with first element equal to i. // Therefore, the expected count for key i would be i. val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) val rdd1 = sc.parallelize(stacked) - val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect() + val counted1 = rdd1.countApproxDistinctByKey(p, sp).collect() counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) } val rnd = new Random() @@ -136,7 +138,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { (1 to num).map(j => (num, j)) } val rdd2 = sc.parallelize(randStacked) - val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect() + val counted2 = rdd2.countApproxDistinctByKey(p, sp, 4).collect() counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) } } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index e686068f7a99a..bbd0c14178368 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -73,10 +73,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val size = 100 val uniformDistro = for (i <- 1 to 100000) yield i % size val simpleRdd = sc.makeRDD(uniformDistro) - assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2) - assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05) - assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01) - assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001) + assert(error(simpleRdd.countApproxDistinct(4, 0), size) < 0.4) + assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.1) } test("SparkContext.union") {