Skip to content

Commit

Permalink
SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus instead…
Browse files Browse the repository at this point in the history
… of HyperLogLog.
  • Loading branch information
rxin committed May 28, 2014
1 parent 3b0baba commit c0ef0c2
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 71 deletions.
28 changes: 21 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
Expand All @@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.SerializableHyperLogLog

/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
Expand Down Expand Up @@ -218,14 +217,29 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vice versa. Uses the provided
* Partitioner to partition the output RDD.
* [[Partitioner]] to partition the output RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
val createHLL = (v: V) => {
val hll = new HyperLogLogPlus(precision)
hll.offer(v)
hll
}
val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
hll.offer(v)
hll
}
val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
h1.addAll(h2)
h1
}

combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
}

/**
Expand Down
21 changes: 17 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
Expand All @@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}

Expand Down Expand Up @@ -925,11 +925,24 @@ abstract class RDD[T: ClassTag](
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*/
@Experimental
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
val zeroCounter = new HyperLogLogPlus(precision)
aggregate(zeroCounter)(
(hll: HyperLogLogPlus, v: T) => {
hll.offer(v)
hll
},
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
h1.addAll(h2)
h2
}).cardinality()
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
val rdd1 = sc.parallelize(stacked)
val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
counted1.foreach{
case(k, count) => assert(error(count, k) < relativeSD)
}
counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) }

val rnd = new Random()

Expand All @@ -139,9 +137,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
}
val rdd2 = sc.parallelize(randStacked)
val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
counted2.foreach{
case(k, count) => assert(error(count, k) < relativeSD)
}
counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) }
}

test("join") {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.5.1</version>
<version>2.7.0</version>
<exclusions>
<!-- Only HyperLogLog is used, which doesn't depend on fastutil -->
<exclusion>
Expand Down
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ object SparkBuild extends Build {
"com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm),
"com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
"org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
"com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil),
"com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil),
"org.spark-project" % "pyrolite" % "2.0.1",
"net.sf.py4j" % "py4j" % "0.8.1"
),
Expand Down

0 comments on commit c0ef0c2

Please sign in to comment.