diff --git a/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala b/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala index c6eafa326..fa57b9207 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala @@ -17,8 +17,10 @@ limitations under the License. package com.twitter.algebird import java.nio.ByteBuffer +import java.lang.Math /** A super lightweight (hopefully) version of BitSet */ +@deprecated("This is no longer used.", since = "0.12.3") case class BitSetLite(in: Array[Byte]) { def contains(x: Int): Boolean = { // Pretend 'in' is little endian so that the bitstring b0b1b2b3 is @@ -26,7 +28,7 @@ case class BitSetLite(in: Array[Byte]) { // 1 is in the bitset. val arrayIdx = x / 8 val remainder = x % 8 - ((in(arrayIdx) >> (7 - remainder)) & 1) == 1 + ((in(arrayIdx) >>> (7 - remainder)) & 1) == 1 } } @@ -77,36 +79,69 @@ object HyperLogLog { } @inline - def twopow(i: Int): Double = java.lang.Math.pow(2.0, i) + def twopow(i: Int): Double = Math.pow(2.0, i) + + @deprecated("This is no longer used. Use j(Array[Byte], Int) instead.", since = "0.12.3") + def j(bsl: BitSetLite, bits: Int): Int = + j(bsl.in, bits) /** - * the value 'j' is equal to - * TODO: We could read in a byte at a time. + * */ - def j(bsl: BitSetLite, bits: Int): Int = { - @annotation.tailrec - def loop(pos: Int, accum: Int): Int = { - if (pos >= bits) { - accum - } else if (bsl.contains(pos)) { - loop(pos + 1, accum + (1 << pos)) - } else { - loop(pos + 1, accum) + def j(bytes: Array[Byte], bits: Int): Int = { + var i = 0 + var sum = 0 + var need = bits + while (i < bytes.length && need > 0) { + val byte = bytes(i) & 0xff + val limit = Math.min(8, need) + var j = 0 + while (j < limit) { + sum += ((byte >>> (7 - j)) & 1) << (i * 8 + j) + j += 1 } + need -= 8 + i += 1 } - loop(0, 0) + sum } + @deprecated("This is no longer used. Use rhoW(Array[Byte], Int) instead.", since = "0.12.3") + def rhoW(bsl: BitSetLite, bits: Int): Byte = + rhoW(bsl.in, bits) + /** - * The value 'w' is equal to . The function rho counts the number of leading - * zeroes in 'w'. We can calculate rho(w) at once with the method rhoW. + * The value 'w' is represented as a bitset (encoding in + * `bytes`). This function counds the number of leading zeros in 'w'. + * + * Each byte is treated as a set of bits (little-endian). That is, + * the one bit represents the first value, then the two bit, then + * four, and so on. + * + * We treat the leading `bits` bits as if they were instead a single + * zero bit. */ - def rhoW(bsl: BitSetLite, bits: Int): Byte = { - @annotation.tailrec - def loop(pos: Int, zeros: Int): Int = - if (bsl.contains(pos)) zeros - else loop(pos + 1, zeros + 1) - loop(bits, 1).toByte + def rhoW(bytes: Array[Byte], bits: Int): Byte = { + var i = bits / 8 // tracks the position in bytes + var j = 7 - (bits % 8) // tracks the bit position in bytes(i) + var zeros = 1 // start with a single zero + while (i < bytes.length) { + while (j >= 0) { + if (((bytes(i) >>> j) & 1) == 1) { + return zeros.toByte + } + zeros += 1 + j -= 1 + } + i += 1 + j = 7 + } + + // If the array has only zeros we say there are no leading + // zeros. Since different length byte arrays of all zeros should + // return the same value here, it's not clear what other answer is + // appropriate. We will almost never reach this code in practice. + 0.toByte } /** @@ -117,10 +152,8 @@ object HyperLogLog { * the value 'w' is equal to . The function rho counts the number of leading * zeroes in 'w'. We can calculate rho(w) at once with the method rhoW. */ - def jRhoW(in: Array[Byte], bits: Int): (Int, Byte) = { - val onBits = BitSetLite(in) - (j(onBits, bits), rhoW(onBits, bits)) - } + def jRhoW(in: Array[Byte], bits: Int): (Int, Byte) = + (j(in, bits), rhoW(in, bits)) def toBytes(h: HLL): Array[Byte] = { h match { @@ -231,6 +264,34 @@ object HyperLogLog { require(err >= 0.00003 && err < 1.0, s"Error must be in (0.00003, 1.0): $err") math.ceil(2.0 * math.log(1.04 / err) / math.log(2.0)).toInt } + + def initialEstimate(bits: Int, size: Int, zeroCnt: Int, z: Double): Double = { + val sizeDouble: Double = size.toDouble + val smallE = 5 * sizeDouble / 2.0 + val factor = alpha(bits) * sizeDouble * sizeDouble + + val e: Double = factor * z + + // There are large and small value corrections from the paper We + // stopped using the large value corrections since when using + // Long there were pathologically bad results. + // + // See https://github.com/twitter/algebird/issues/284 + if (e > smallE || zeroCnt == 0) e + else size * scala.math.log(size.toDouble / zeroCnt) + } + + def asApprox(bits: Int, v: Double): Approximate[Long] = { + val stdev = 1.04 / scala.math.sqrt(twopow(bits)) + val lowerBound = Math.floor(math.max(v * (1.0 - 3 * stdev), 0.0)).toLong + val upperBound = Math.ceil(v * (1.0 + 3 * stdev)).toLong + // Lower bound. see: http://en.wikipedia.org/wiki/68-95-99.7_rule + val prob3StdDev = 0.9972 + Approximate(lowerBound, v.toLong, upperBound, prob3StdDev) + } + + def approximateSize(bits: Int, size: Int, zeroCnt: Int, z: Double): Approximate[Long] = + asApprox(bits, initialEstimate(bits, size, zeroCnt, z)) } sealed abstract class HLL extends java.io.Serializable { @@ -243,43 +304,17 @@ sealed abstract class HLL extends java.io.Serializable { def +(other: HLL): HLL def toDenseHLL: DenseHLL - def approximateSize = asApprox(initialEstimate) + def approximateSize: Approximate[Long] = + asApprox(initialEstimate) def estimatedSize: Double = initialEstimate - private lazy val initialEstimate: Double = { + private lazy val initialEstimate: Double = + HyperLogLog.initialEstimate(bits, size, zeroCnt, z) - val sizeDouble: Double = size.toDouble - val smallE = 5 * sizeDouble / 2.0 - val factor = alpha(bits) * sizeDouble * sizeDouble + private def asApprox(v: Double): Approximate[Long] = + HyperLogLog.asApprox(bits, v) - val e: Double = factor * z - // There are large and small value corrections from the paper - // We stopped using the large value corrections since when using Long's - // there was pathologically bad results. See https://github.com/twitter/algebird/issues/284 - if (e <= smallE) { - smallEstimate(e) - } else { - e - } - } - - private def asApprox(v: Double): Approximate[Long] = { - val stdev = error(bits) - val lowerBound = math.floor(math.max(v * (1.0 - 3 * stdev), 0.0)).toLong - val upperBound = math.ceil(v * (1.0 + 3 * stdev)).toLong - // Lower bound. see: http://en.wikipedia.org/wiki/68-95-99.7_rule - val prob3StdDev = 0.9972 - Approximate(lowerBound, v.toLong, upperBound, prob3StdDev) - } - - private def smallEstimate(e: Double): Double = { - if (zeroCnt == 0) { - e - } else { - size * scala.math.log(size.toDouble / zeroCnt) - } - } /** * Set each item in the given buffer to the max of this and the buffer */ @@ -311,7 +346,7 @@ sealed abstract class HLL extends java.io.Serializable { // find the number of leading zeros // we use bitMask to force rhoW to stop after going through (bits - reducedBits) bits ByteBuffer.wrap(buf).putInt(Integer.reverse(currentJ | bitMask)) - HyperLogLog.rhoW(BitSetLite(buf), reducedBits) + HyperLogLog.rhoW(buf, reducedBits) } } @@ -563,10 +598,8 @@ class HyperLogLogMonoid(val bits: Int) extends Monoid[HLL] { createFromHashBytes(hashBytes) } - def createFromHashBytes(hashed: Array[Byte]): HLL = { - val (j, rhow) = jRhoW(hashed, bits) - SparseHLL(bits, Map(j -> Max(rhow))) - } + def createFromHashBytes(hashed: Array[Byte]): HLL = + SparseHLL(bits, Map(j(hashed, bits) -> Max(rhoW(hashed, bits)))) @deprecated("Use toHLL", since = "0.10.0 / 2015-05") def batchCreate[T <% Array[Byte]](instances: Iterable[T]): HLL = { diff --git a/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala b/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala index 927cd035a..305db67c1 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala @@ -31,24 +31,72 @@ package com.twitter.algebird * @return New HLLSeries */ case class HLLSeries(bits: Int, rows: Vector[Map[Int, Long]]) { + + def insert(data: Array[Byte], timestamp: Long): HLLSeries = { + import HyperLogLog._ + val hashed = hash(data) + val j0 = j(hashed, bits) + val rhow = rhoW(hashed, bits) - 1 + + val newRows = if (rhow < rows.size) { + val row = rows(rhow) + val t = row.get(j0) match { + case Some(t0) => Math.max(t0, timestamp) + case None => timestamp + } + rows.updated(rhow, row.updated(j0, t)) + } else { + val padded = (rows.size until rhow).foldLeft(rows)((rs, _) => rs :+ Map.empty) + padded :+ Map(j0 -> timestamp) + } + HLLSeries(bits, newRows) + } + + def maxRhowStats(threshold: Long): (Int, Double) = { + val seen = scala.collection.mutable.Set.empty[Int] + var sum: Double = 0.0 + var i = rows.size - 1 + while (i >= 0) { + val it = rows(i).iterator + while (it.hasNext) { + val (k, t) = it.next + if (t >= threshold && seen.add(k)) { + sum += HyperLogLog.negativePowersOfTwo(i + 1) + } + } + i -= 1 + } + (seen.size, sum) + } + + def approximateSizeSince(threshold: Long): Approximate[Long] = { + val (maxRhowSize, maxRhowSum) = maxRhowStats(threshold) + val size = 1 << bits + val zeroCnt = size - maxRhowSize + val z = 1.0 / (zeroCnt + maxRhowSum) + HyperLogLog.approximateSize(bits, size, zeroCnt, z) + } + /** * @param since Timestamp from which to reconstruct the HLL * * @return New HLLSeries only including RhoWs for values seen at or after the given timestamp */ - def since(threshold: Long) = + def since(threshold: Long): HLLSeries = HLLSeries( bits, rows.map{ _.filter{ case (j, ts) => ts >= threshold } }) - def toHLL: HLL = - if (rows.isEmpty) - SparseHLL(bits, Map()) - else - rows.zipWithIndex.map{ + def toHLL: HLL = { + val monoid = new HyperLogLogMonoid(bits) + if (rows.isEmpty) monoid.zero + else { + monoid.sum(rows.iterator.zipWithIndex.map { case (map, i) => - SparseHLL(bits, map.mapValues{ ts => Max((i + 1).toByte) }): HLL - }.reduce{ _ + _ } + SparseHLL(bits, map.mapValues { ts => Max((i + 1).toByte) }) + }) + } + } } /** @@ -69,36 +117,44 @@ case class HLLSeries(bits: Int, rows: Vector[Map[Int, Long]]) { class HyperLogLogSeriesMonoid(val bits: Int) extends Monoid[HLLSeries] { import HyperLogLog._ - val zero = HLLSeries(bits, Vector()) + val zero: HLLSeries = HLLSeries(bits, Vector.empty) def create(example: Array[Byte], timestamp: Long): HLLSeries = { val hashed = hash(example) - val (j, rhow) = jRhoW(hashed, bits) - - val vector = Vector.fill(rhow - 1){ Map[Int, Long]() } ++ Vector(Map(j -> timestamp)) - HLLSeries(bits, vector) + val rhow = rhoW(hashed, bits) + val e = Map.empty[Int, Long] + val bldr = Vector.newBuilder[Map[Int, Long]] + var i = 1 + while (i < rhow) { bldr += e; i += 1 } + bldr += Map(j(hashed, bits) -> timestamp) + HLLSeries(bits, bldr.result()) } def plus(left: HLLSeries, right: HLLSeries): HLLSeries = { - if (left.rows.size > right.rows.size) + val ln = left.rows.size + val rn = right.rows.size + if (ln > rn) { plus(right, left) - else { - val zipped = left.rows.zip(right.rows).map{ - case (l, r) => - combine(l, r) - } - HLLSeries( - bits, - zipped ++ right.rows.slice(left.rows.size, right.rows.size)) + } else { + val bldr = Vector.newBuilder[Map[Int, Long]] + val lit = left.rows.iterator + val rit = right.rows.iterator + while (lit.hasNext && rit.hasNext) bldr += combine(lit.next, rit.next) + val zipped = bldr.result() + HLLSeries(bits, zipped ++ right.rows.slice(ln, rn)) } } - private def combine(left: Map[Int, Long], right: Map[Int, Long]): Map[Int, Long] = { - if (left.size > right.size) + private def combine(left: Map[Int, Long], right: Map[Int, Long]): Map[Int, Long] = + if (left.size > right.size) { combine(right, left) - else { - right ++ - left.map{ case (k, v) => k -> (right.getOrElse(k, 0L).max(v)) } + } else { + left.foldLeft(right) { + case (m, (k, lv)) => + m.updated(k, m.get(k) match { + case None => lv + case Some(rv) => Math.max(lv, rv) + }) + } } - } } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala index 8a8b049a1..0e1703f0d 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala @@ -2,24 +2,75 @@ package com.twitter.algebird import org.scalatest._ -import org.scalacheck.{ Gen, Arbitrary, Properties } +import org.scalacheck.{ Gen, Arbitrary, Properties, Prop } +import Arbitrary.arbitrary import HyperLogLog.{ int2Bytes, long2Bytes } class HyperLogLogSeriesLaws extends CheckProperties { import BaseProperties._ - implicit val hllSeriesMonoid = new HyperLogLogSeriesMonoid(5) //5 bits + implicit val monoid = new HyperLogLogSeriesMonoid(8) - implicit val hllSeriesGen = Arbitrary { - for ( - v <- Gen.choose(0, 10000) - ) yield (hllSeriesMonoid.create(v, v)) + case class Timestamp(value: Long) + + object Timestamp { + implicit val arbitraryTimestamp: Arbitrary[Timestamp] = + Arbitrary(Gen.choose(0L, 1000000L).map(t => Timestamp(t))) } + def absorb(h0: HLLSeries, ts: List[Timestamp]): HLLSeries = + monoid.sum(h0 :: ts.map(t => monoid.create(long2Bytes(t.value), t.value))) + + def directAbsorb(h0: HLLSeries, ts: List[Timestamp]): HLLSeries = + ts.foldLeft(h0)((h, t) => h.insert(long2Bytes(t.value), t.value)) + + implicit val hllSeriesGen: Arbitrary[HLLSeries] = + Arbitrary(arbitrary[List[Timestamp]].map(ts => absorb(monoid.zero, ts))) + property("HyperLogLogSeries is a Monoid") { monoidLawsEq[HLLSeries]{ _.toHLL == _.toHLL } } + + property("HyperLogLogSeries is commutative") { + Prop.forAll { (h: HLLSeries, ts: List[Timestamp]) => + absorb(h, ts) == absorb(h, ts.reverse) + } + } + + property("series.approximateSizeSince(start) = h.since(t).toHLL.approximateSize") { + Prop.forAll { (h: HLLSeries, t: Timestamp) => + h.approximateSizeSince(t.value) == h.since(t.value).toHLL.approximateSize + } + } + + property("h.insert(bs, t) = m.plus(h, m.create(bs, t))") { + Prop.forAll { (h: HLLSeries, ts: List[Timestamp]) => + absorb(h, ts) == directAbsorb(h, ts) + } + } + + // this is a deterministic test to ensure that our rates are staying + // within the expected error bounds. + property("verify error rate") { + + // Ensure that building an HLLSeries containing the given + // cardinality of items have an acceptable error rate. + def verify(cardinality: Int, errorPct: Double): Boolean = { + val it = (0 until cardinality).iterator + val h = monoid.sum(it.map(i => monoid.create(int2Bytes(i), i))) + val n = h.since(0L).toHLL.approximateSize.estimate + val delta = (cardinality * errorPct).toInt + (cardinality - delta) <= n && n <= (cardinality + delta) + } + + // We've verified that at 8-bits, the follow cardinalities all + // have <= 10% error. This is intended to protect us against + // possible future regressions (where the error rate gets worse + // than expected). + val cardinalities = List(1024, 2048, 4096, 8192, 16384, 32768, 65536) + cardinalities.forall { n => verify(n, 0.1) } + } } class HLLSeriesSinceProperty extends ApproximateProperty { @@ -32,13 +83,13 @@ class HLLSeriesSinceProperty extends ApproximateProperty { type Result = Long val bits = 12 - val hllSeriesMonoid = new HyperLogLogSeriesMonoid(bits) + val monoid = new HyperLogLogSeriesMonoid(bits) val hll = new HyperLogLogMonoid(bits) def makeApproximate(timestampedData: Seq[(Long, Long)]) = { val hllSeries = timestampedData - .map { case (value, timestamp) => hllSeriesMonoid.create(value, timestamp) } - hllSeriesMonoid.sum(hllSeries) + .map { case (value, timestamp) => monoid.create(value, timestamp) } + monoid.sum(hllSeries) } def exactGenerator: Gen[Seq[(Long, Long)]] = for { diff --git a/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogTest.scala index 4986e1ed7..1f40e0c22 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogTest.scala @@ -29,8 +29,9 @@ object ReferenceHyperLogLog { def jRhoW(in: Array[Byte], bits: Int): (Int, Byte) = { val onBits = bytesToBitSet(in) - (onBits.filter { _ < bits }.map { 1 << _ }.sum, - (onBits.filter { _ >= bits }.min - bits + 1).toByte) + val j = onBits.filter { _ < bits }.map { 1 << _ }.sum + val rhow = onBits.find { _ >= bits }.map { _ - bits + 1 }.getOrElse(0) + (j, rhow.toByte) } def hash(input: Array[Byte]): Array[Byte] = { @@ -49,13 +50,11 @@ class HyperLogLogLaws extends CheckProperties { import BaseProperties._ import HyperLogLog._ - implicit val hllMonoid = new HyperLogLogMonoid(5) //5 bits + val bits = 8 + implicit val hllMonoid = new HyperLogLogMonoid(bits) - implicit val hllGen = Arbitrary { - for ( - v <- Gen.choose(0, 10000) - ) yield (hllMonoid.create(v)) - } + implicit val hllGen: Arbitrary[HLL] = + Arbitrary(Gen.choose(0L, 1000000L).map(v => hllMonoid.create(long2Bytes(v)))) property("HyperLogLog is a Monoid") { monoidLawsEq[HLL]{ _.toDenseHLL == _.toDenseHLL } @@ -77,6 +76,15 @@ class HyperLogLogLaws extends CheckProperties { HyperLogLog.hash(a).toSeq == ReferenceHyperLogLog.hash(a).toSeq } } + + property("HyperLogLog.j and rhow match reference") { + Prop.forAll { (bytes: Array[Byte]) => + val (j0, rhow0) = ReferenceHyperLogLog.jRhoW(bytes, bits) + val j1 = HyperLogLog.j(bytes, bits) + val rhow1 = HyperLogLog.rhoW(bytes, bits) + j0 == j1 && rhow0 == rhow1 + } + } } /* Ensure jRhoW matches referenceJRhoW */ @@ -217,7 +225,7 @@ class HLLProperties extends ApproximateProperties("HyperLogLog") { implicit val intGen = Gen.chooseNum(Int.MinValue, Int.MaxValue) implicit val longGen = Gen.chooseNum(Long.MinValue, Long.MaxValue) - for (bits <- List(5, 6, 7, 10)) { + for (bits <- List(5, 6, 7, 8, 10)) { property(s"Count ints with $bits bits") = toProp(new HLLCountProperty[Int](bits), 100, 1, 0.01) @@ -249,14 +257,14 @@ class SetSizeAggregatorProperties extends ApproximateProperties("SetSizeAggregat implicit val intGen = Gen.chooseNum(Int.MinValue, Int.MaxValue) implicit val longGen = Gen.chooseNum(Long.MinValue, Long.MaxValue) - for (bits <- List(5, 7, 10)) { + for (bits <- List(5, 7, 8, 10)) { property(s"work as an Aggregator and return exact size when <= maxSetSize for $bits bits, using conversion to Array[Byte]") = toProp(new SmallBytesSetSizeAggregatorProperty[Int](bits), 100, 1, 0.01) property(s"work as an Aggregator and return exact size when <= maxSetSize for $bits bits, using Hash128") = toProp(new SmallSetSizeHashAggregatorProperty[Int](bits), 100, 1, 0.01) } - for (bits <- List(5, 7, 10)) { + for (bits <- List(5, 7, 8, 10)) { property(s"work as an Aggregator and return approximate size when > maxSetSize for $bits bits, using conversion to Array[Byte]") = toProp(new LargeBytesSetSizeAggregatorProperty[Int](bits), 100, 1, 0.01) property(s"work as an Aggregator and return approximate size when > maxSetSize for $bits bits, using Hash128") = @@ -349,7 +357,7 @@ class HyperLogLogTest extends WordSpec with Matchers { } "work as an Aggregator and return a HLL" in { - List(5, 7, 10).foreach(bits => { + List(5, 7, 8, 10).foreach(bits => { val aggregator = HyperLogLogAggregator(bits) val data = (0 to 10000).map { i => r.nextInt(1000) } val exact = exactCount(data).toDouble @@ -360,7 +368,7 @@ class HyperLogLogTest extends WordSpec with Matchers { } "work as an Aggregator and return size" in { - List(5, 7, 10).foreach(bits => { + List(5, 7, 8, 10).foreach(bits => { val aggregator = HyperLogLogAggregator.sizeAggregator(bits) val data = (0 to 10000).map { i => r.nextInt(1000) } val exact = exactCount(data).toDouble