Skip to content

Commit

Permalink
Incorporate code review feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jun 3, 2014
1 parent e110d70 commit 9e320c8
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 155 deletions.
89 changes: 18 additions & 71 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -675,97 +675,44 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param p The precision value for the normal set.
* `p` must be a value between 4 and `sp` (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If `sp` equals 0, the sparse representation is skipped.
* @param partitioner Partitioner to use for the resulting RDD.
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It should be greater than 0.000017.
* @param partitioner partitioner of the resulting RDD.
*/
def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(p, sp, partitioner))
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
{
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
}

/**
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param p The precision value for the normal set.
* `p` must be a value between 4 and `sp` (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If `sp` equals 0, the sparse representation is skipped.
* @param numPartitions The number of partitions in the resulting RDD.
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It should be greater than 0.000017.
* @param numPartitions number of partitions of the resulting RDD.
*/
def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(p, sp, numPartitions))
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
}

/**
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param p The precision value for the normal set.
* `p` must be a value between 4 and `sp` (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If `sp` equals 0, the sparse representation is skipped.
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It should be greater than 0.000017.
*/
def countApproxDistinctByKey(p: Int, sp: Int): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(p, sp))
}

/**
* Return approximate number of distinct values for each key in this RDD. This is deprecated.
* Use the variant with `p` and `sp` parameters instead.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
*
* @param relativeSD The relative standard deviation for the counter.
* Smaller values create counters that require more space.
*/
@Deprecated
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, partitioner)
}

/**
* Return approximate number of distinct values for each key in this RDD. This is deprecated.
* Use the variant with `p` and `sp` parameters instead.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
*
* @param relativeSD The relative standard deviation for the counter.
* Smaller values create counters that require more space.
*/
@Deprecated
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
}

/**
* Return approximate number of distinct values for each key in this RDD. This is deprecated.
* Use the variant with <code>p</code> and <code>sp</code> parameters instead.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
*
* @param relativeSD The relative standard deviation for the counter.
* Smaller values create counters that require more space.
*/
@Deprecated
def countApproxDistinctByKey(relativeSD: Double): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD)
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD))
}

/** Assign a name to this RDD */
Expand Down
29 changes: 3 additions & 26 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -562,34 +562,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param p The precision value for the normal set.
* `p` must be a value between 4 and `sp` (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If `sp` equals 0, the sparse representation is skipped.
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It should be greater than 0.000017.
*/
def countApproxDistinct(p: Int, sp: Int): Long = rdd.countApproxDistinct(p, sp)

/**
* Return approximate number of distinct elements in the RDD. This method uses 20 as p value
* and 0 as sp value (i.e. skipping sparse representation).
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
*/
def countApproxDistinct(): Long = rdd.countApproxDistinct(20, 0)

/**
* Return approximate number of distinct elements in the RDD. This is deprecated. Use the
* variant with `p` and `sp` parameters instead.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
*/
@Deprecated
def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)

def name(): String = rdd.name
Expand Down
75 changes: 26 additions & 49 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param p The precision value for the normal set.
* The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
* would trigger sparse representation of registers, which may reduce the memory consumption
* and increase accuracy when the cardinality is small.
*
*@param p The precision value for the normal set.
* `p` must be a value between 4 and `sp` (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If `sp` equals 0, the sparse representation is skipped.
* @param partitioner Partitioner to use for the resulting RDD.
*/
@Experimental
def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = {
require(p >= 4, s"p ($p) should be >= 4")
require(sp <= 32, s"sp ($sp) should be <= 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
val createHLL = (v: V) => {
val hll = new HyperLogLogPlus(p, sp)
hll.offer(v)
Expand All @@ -247,66 +254,34 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

/**
* :: Experimental ::
*
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
*
* @param p The precision value for the normal set.
* `p` must be a value between 4 and `sp` (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If `sp` equals 0, the sparse representation is skipped.
* @param numPartitions Number of partitions in the resulting RDD.
*/
@Experimental
def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(p, sp, new HashPartitioner(numPartitions))
}

/**
* :: Experimental ::
*
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
*
* @param p The precision value for the normal set.
* `p` must be a value between 4 and `sp` (32 max).
* @param sp The precision value for the sparse set, between 0 and 32.
* If `sp` equals 0, the sparse representation is skipped.
*/
@Experimental
def countApproxDistinctByKey(p: Int, sp: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(p, sp, defaultPartitioner(self))
}

/**
* Return approximate number of distinct values for each key in this RDD.
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It should be greater than 0.000017.
* @param partitioner partitioner of the resulting RDD
*/
@deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1")
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
// See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p.
val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
countApproxDistinctByKey(p, 0, partitioner)
require(relativeSD > 0.000017, s"accuracy ($relativeSD) should be greater than 0.000017")
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
assert(p <= 32)
countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner)
}

/**
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It should be greater than 0.000017.
* @param numPartitions number of partitions of the resulting RDD
*/
@deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1")
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
}
Expand All @@ -316,9 +291,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It should be greater than 0.000017.
*/
@deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1")
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
}
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,11 @@ abstract class RDD[T: ClassTag](
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
* would trigger sparse representation of registers, which may reduce the memory consumption
* and increase accuracy when the cardinality is small.
*
* @param p The precision value for the normal set.
* <code>p</code> must be a value between 4 and <code>sp</code>.
Expand Down Expand Up @@ -951,12 +955,12 @@ abstract class RDD[T: ClassTag](
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://research.google.com/pubs/pub40671.html">here</a>.
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
*/
@deprecated("Use countApproxDistinct with parameter p and sp", "1.0.1")
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
// See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p.
val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
countApproxDistinct(p, 0)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ public void countApproxDistinct() {
arrayData.add(i % size);
}
JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(8, 0) - size) / (size * 1.0)) <= 0.1);
Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
(1 to num).map(j => (num, j))
}
val rdd2 = sc.parallelize(randStacked)
val counted2 = rdd2.countApproxDistinctByKey(p, sp, 4).collect()
val counted2 = rdd2.countApproxDistinctByKey(relativeSD).collect()
counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) }
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@
<artifactId>stream</artifactId>
<version>2.7.0</version>
<exclusions>
<!-- Only HyperLogLog is used, which doesn't depend on fastutil -->
<!-- Only HyperLogLogPlus is used, which doesn't depend on fastutil -->
<exclusion>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ object SparkBuild extends Build {
"com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm),
"com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
"org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
"com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil),
"com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil), // Only HyperLogLogPlus is used, which does not depend on fastutil.
"org.spark-project" % "pyrolite" % "2.0.1",
"net.sf.py4j" % "py4j" % "0.8.1"
),
Expand Down

0 comments on commit 9e320c8

Please sign in to comment.