From 9e320c84f512ef106bcd5544fab2ea782a521c66 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 3 Jun 2014 16:35:31 -0700 Subject: [PATCH] Incorporate code review feedback. --- .../apache/spark/api/java/JavaPairRDD.scala | 89 ++++--------------- .../apache/spark/api/java/JavaRDDLike.scala | 29 +----- .../apache/spark/rdd/PairRDDFunctions.scala | 75 ++++++---------- .../main/scala/org/apache/spark/rdd/RDD.scala | 14 +-- .../java/org/apache/spark/JavaAPISuite.java | 2 +- .../spark/rdd/PairRDDFunctionsSuite.scala | 2 +- pom.xml | 2 +- project/SparkBuild.scala | 2 +- 8 files changed, 60 insertions(+), 155 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index b53602c484129..9419c3f5989be 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -675,16 +675,15 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. - * @param partitioner Partitioner to use for the resulting RDD. + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. + * @param partitioner partitioner of the resulting RDD. */ - def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(p, sp, partitioner)) + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = + { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)) } /** @@ -692,16 +691,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. - * @param numPartitions The number of partitions in the resulting RDD. + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. + * @param numPartitions number of partitions of the resulting RDD. */ - def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(p, sp, numPartitions)) + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)) } /** @@ -709,63 +706,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. */ - def countApproxDistinctByKey(p: Int, sp: Int): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(p, sp)) - } - - /** - * Return approximate number of distinct values for each key in this RDD. This is deprecated. - * Use the variant with `p` and `sp` parameters instead. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param relativeSD The relative standard deviation for the counter. - * Smaller values create counters that require more space. - */ - @Deprecated - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, partitioner) - } - - /** - * Return approximate number of distinct values for each key in this RDD. This is deprecated. - * Use the variant with `p` and `sp` parameters instead. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param relativeSD The relative standard deviation for the counter. - * Smaller values create counters that require more space. - */ - @Deprecated - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, numPartitions) - } - - /** - * Return approximate number of distinct values for each key in this RDD. This is deprecated. - * Use the variant with p and sp parameters instead. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param relativeSD The relative standard deviation for the counter. - * Smaller values create counters that require more space. - */ - @Deprecated - def countApproxDistinctByKey(relativeSD: Double): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD) + def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD)) } /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 28c3e05f3e15c..2741532732c27 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -562,34 +562,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. */ - def countApproxDistinct(p: Int, sp: Int): Long = rdd.countApproxDistinct(p, sp) - - /** - * Return approximate number of distinct elements in the RDD. This method uses 20 as p value - * and 0 as sp value (i.e. skipping sparse representation). - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - */ - def countApproxDistinct(): Long = rdd.countApproxDistinct(20, 0) - - /** - * Return approximate number of distinct elements in the RDD. This is deprecated. Use the - * variant with `p` and `sp` parameters instead. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - */ - @Deprecated def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD) def name(): String = rdd.name diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index e9c10ae44ac71..d459815ae7cbe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -219,9 +219,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. * - * @param p The precision value for the normal set. + * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` + * would trigger sparse representation of registers, which may reduce the memory consumption + * and increase accuracy when the cardinality is small. + * + *@param p The precision value for the normal set. * `p` must be a value between 4 and `sp` (32 max). * @param sp The precision value for the sparse set, between 0 and 32. * If `sp` equals 0, the sparse representation is skipped. @@ -229,6 +233,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ @Experimental def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { + require(p >= 4, s"p ($p) should be >= 4") + require(sp <= 32, s"sp ($sp) should be <= 32") + require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") val createHLL = (v: V) => { val hll = new HyperLogLogPlus(p, sp) hll.offer(v) @@ -247,56 +254,21 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** - * :: Experimental :: - * - * Return approximate number of distinct values for each key in this RDD. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. - * @param numPartitions Number of partitions in the resulting RDD. - */ - @Experimental - def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): RDD[(K, Long)] = { - countApproxDistinctByKey(p, sp, new HashPartitioner(numPartitions)) - } - - /** - * :: Experimental :: - * * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. - */ - @Experimental - def countApproxDistinctByKey(p: Int, sp: Int): RDD[(K, Long)] = { - countApproxDistinctByKey(p, sp, defaultPartitioner(self)) - } - - /** - * Return approximate number of distinct values for each key in this RDD. + * here. * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. + * @param partitioner partitioner of the resulting RDD */ - @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { - // See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p. - val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt - countApproxDistinctByKey(p, 0, partitioner) + require(relativeSD > 0.000017, s"accuracy ($relativeSD) should be greater than 0.000017") + val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt + assert(p <= 32) + countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner) } /** @@ -304,9 +276,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. + * @param numPartitions number of partitions of the resulting RDD */ - @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } @@ -316,9 +291,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. */ - @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ab15052f519f2..58375b9b07c32 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -922,7 +922,11 @@ abstract class RDD[T: ClassTag]( * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. + * + * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` + * would trigger sparse representation of registers, which may reduce the memory consumption + * and increase accuracy when the cardinality is small. * * @param p The precision value for the normal set. * p must be a value between 4 and sp. @@ -951,12 +955,12 @@ abstract class RDD[T: ClassTag]( * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. */ - @deprecated("Use countApproxDistinct with parameter p and sp", "1.0.1") def countApproxDistinct(relativeSD: Double = 0.05): Long = { - // See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p. - val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt + val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt countApproxDistinct(p, 0) } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 68cf183c91c48..b78309f81cb8c 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1028,7 +1028,7 @@ public void countApproxDistinct() { arrayData.add(i % size); } JavaRDD simpleRdd = sc.parallelize(arrayData, 10); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(8, 0) - size) / (size * 1.0)) <= 0.1); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1); } @Test diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 937ddfcc2b6b1..461ffeddbff2a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -138,7 +138,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { (1 to num).map(j => (num, j)) } val rdd2 = sc.parallelize(randStacked) - val counted2 = rdd2.countApproxDistinctByKey(p, sp, 4).collect() + val counted2 = rdd2.countApproxDistinctByKey(relativeSD).collect() counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) } } diff --git a/pom.xml b/pom.xml index 63701a1e40f49..fcd6f66b4414a 100644 --- a/pom.xml +++ b/pom.xml @@ -302,7 +302,7 @@ stream 2.7.0 - + it.unimi.dsi fastutil diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e136461d380d5..efb0b9319be13 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -362,7 +362,7 @@ object SparkBuild extends Build { "com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm), "com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm), "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock), - "com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil), + "com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil), // Only HyperLogLogPlus is used, which does not depend on fastutil. "org.spark-project" % "pyrolite" % "2.0.1", "net.sf.py4j" % "py4j" % "0.8.1" ),