Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve HLLSeries performance. #575

Merged
merged 7 commits into from
Nov 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 96 additions & 63 deletions algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ limitations under the License.
package com.twitter.algebird

import java.nio.ByteBuffer
import java.lang.Math

/** A super lightweight (hopefully) version of BitSet */
@deprecated("This is no longer used.", since = "0.12.3")
case class BitSetLite(in: Array[Byte]) {
def contains(x: Int): Boolean = {
// Pretend 'in' is little endian so that the bitstring b0b1b2b3 is
// such that if b0 == 1, then 0 is in the bitset, if b1 == 1, then
// 1 is in the bitset.
val arrayIdx = x / 8
val remainder = x % 8
((in(arrayIdx) >> (7 - remainder)) & 1) == 1
((in(arrayIdx) >>> (7 - remainder)) & 1) == 1
}
}

Expand Down Expand Up @@ -77,36 +79,69 @@ object HyperLogLog {
}

@inline
def twopow(i: Int): Double = java.lang.Math.pow(2.0, i)
def twopow(i: Int): Double = Math.pow(2.0, i)

@deprecated("This is no longer used. Use j(Array[Byte], Int) instead.", since = "0.12.3")
def j(bsl: BitSetLite, bits: Int): Int =
j(bsl.in, bits)

/**
* the value 'j' is equal to <w_0, w_1 ... w_(bits-1)>
* TODO: We could read in a byte at a time.
*
*/
def j(bsl: BitSetLite, bits: Int): Int = {
@annotation.tailrec
def loop(pos: Int, accum: Int): Int = {
if (pos >= bits) {
accum
} else if (bsl.contains(pos)) {
loop(pos + 1, accum + (1 << pos))
} else {
loop(pos + 1, accum)
def j(bytes: Array[Byte], bits: Int): Int = {
var i = 0
var sum = 0
var need = bits
while (i < bytes.length && need > 0) {
val byte = bytes(i) & 0xff
val limit = Math.min(8, need)
var j = 0
while (j < limit) {
sum += ((byte >>> (7 - j)) & 1) << (i * 8 + j)
j += 1
}
need -= 8
i += 1
}
loop(0, 0)
sum
}

@deprecated("This is no longer used. Use rhoW(Array[Byte], Int) instead.", since = "0.12.3")
def rhoW(bsl: BitSetLite, bits: Int): Byte =
rhoW(bsl.in, bits)

/**
* The value 'w' is equal to <w_bits ... w_n>. The function rho counts the number of leading
* zeroes in 'w'. We can calculate rho(w) at once with the method rhoW.
* The value 'w' is represented as a bitset (encoding in
* `bytes`). This function counds the number of leading zeros in 'w'.
*
* Each byte is treated as a set of bits (little-endian). That is,
* the one bit represents the first value, then the two bit, then
* four, and so on.
*
* We treat the leading `bits` bits as if they were instead a single
* zero bit.
*/
def rhoW(bsl: BitSetLite, bits: Int): Byte = {
@annotation.tailrec
def loop(pos: Int, zeros: Int): Int =
if (bsl.contains(pos)) zeros
else loop(pos + 1, zeros + 1)
loop(bits, 1).toByte
def rhoW(bytes: Array[Byte], bits: Int): Byte = {
var i = bits / 8 // tracks the position in bytes
var j = 7 - (bits % 8) // tracks the bit position in bytes(i)
var zeros = 1 // start with a single zero
while (i < bytes.length) {
while (j >= 0) {
if (((bytes(i) >>> j) & 1) == 1) {
return zeros.toByte
}
zeros += 1
j -= 1
}
i += 1
j = 7
}

// If the array has only zeros we say there are no leading
// zeros. Since different length byte arrays of all zeros should
// return the same value here, it's not clear what other answer is
// appropriate. We will almost never reach this code in practice.
0.toByte
}

/**
Expand All @@ -117,10 +152,8 @@ object HyperLogLog {
* the value 'w' is equal to <w_bits ... w_n>. The function rho counts the number of leading
* zeroes in 'w'. We can calculate rho(w) at once with the method rhoW.
*/
def jRhoW(in: Array[Byte], bits: Int): (Int, Byte) = {
val onBits = BitSetLite(in)
(j(onBits, bits), rhoW(onBits, bits))
}
def jRhoW(in: Array[Byte], bits: Int): (Int, Byte) =
(j(in, bits), rhoW(in, bits))

def toBytes(h: HLL): Array[Byte] = {
h match {
Expand Down Expand Up @@ -231,6 +264,34 @@ object HyperLogLog {
require(err >= 0.00003 && err < 1.0, s"Error must be in (0.00003, 1.0): $err")
math.ceil(2.0 * math.log(1.04 / err) / math.log(2.0)).toInt
}

def initialEstimate(bits: Int, size: Int, zeroCnt: Int, z: Double): Double = {
val sizeDouble: Double = size.toDouble
val smallE = 5 * sizeDouble / 2.0
val factor = alpha(bits) * sizeDouble * sizeDouble

val e: Double = factor * z

// There are large and small value corrections from the paper We
// stopped using the large value corrections since when using
// Long there were pathologically bad results.
//
// See https://github.com/twitter/algebird/issues/284
if (e > smallE || zeroCnt == 0) e
else size * scala.math.log(size.toDouble / zeroCnt)
}

def asApprox(bits: Int, v: Double): Approximate[Long] = {
val stdev = 1.04 / scala.math.sqrt(twopow(bits))
val lowerBound = Math.floor(math.max(v * (1.0 - 3 * stdev), 0.0)).toLong
val upperBound = Math.ceil(v * (1.0 + 3 * stdev)).toLong
// Lower bound. see: http://en.wikipedia.org/wiki/68-95-99.7_rule
val prob3StdDev = 0.9972
Approximate(lowerBound, v.toLong, upperBound, prob3StdDev)
}

def approximateSize(bits: Int, size: Int, zeroCnt: Int, z: Double): Approximate[Long] =
asApprox(bits, initialEstimate(bits, size, zeroCnt, z))
}

sealed abstract class HLL extends java.io.Serializable {
Expand All @@ -243,43 +304,17 @@ sealed abstract class HLL extends java.io.Serializable {
def +(other: HLL): HLL
def toDenseHLL: DenseHLL

def approximateSize = asApprox(initialEstimate)
def approximateSize: Approximate[Long] =
asApprox(initialEstimate)

def estimatedSize: Double = initialEstimate

private lazy val initialEstimate: Double = {
private lazy val initialEstimate: Double =
HyperLogLog.initialEstimate(bits, size, zeroCnt, z)

val sizeDouble: Double = size.toDouble
val smallE = 5 * sizeDouble / 2.0
val factor = alpha(bits) * sizeDouble * sizeDouble
private def asApprox(v: Double): Approximate[Long] =
HyperLogLog.asApprox(bits, v)

val e: Double = factor * z
// There are large and small value corrections from the paper
// We stopped using the large value corrections since when using Long's
// there was pathologically bad results. See https://github.com/twitter/algebird/issues/284
if (e <= smallE) {
smallEstimate(e)
} else {
e
}
}

private def asApprox(v: Double): Approximate[Long] = {
val stdev = error(bits)
val lowerBound = math.floor(math.max(v * (1.0 - 3 * stdev), 0.0)).toLong
val upperBound = math.ceil(v * (1.0 + 3 * stdev)).toLong
// Lower bound. see: http://en.wikipedia.org/wiki/68-95-99.7_rule
val prob3StdDev = 0.9972
Approximate(lowerBound, v.toLong, upperBound, prob3StdDev)
}

private def smallEstimate(e: Double): Double = {
if (zeroCnt == 0) {
e
} else {
size * scala.math.log(size.toDouble / zeroCnt)
}
}
/**
* Set each item in the given buffer to the max of this and the buffer
*/
Expand Down Expand Up @@ -311,7 +346,7 @@ sealed abstract class HLL extends java.io.Serializable {
// find the number of leading zeros
// we use bitMask to force rhoW to stop after going through (bits - reducedBits) bits
ByteBuffer.wrap(buf).putInt(Integer.reverse(currentJ | bitMask))
HyperLogLog.rhoW(BitSetLite(buf), reducedBits)
HyperLogLog.rhoW(buf, reducedBits)
}
}

Expand Down Expand Up @@ -563,10 +598,8 @@ class HyperLogLogMonoid(val bits: Int) extends Monoid[HLL] {
createFromHashBytes(hashBytes)
}

def createFromHashBytes(hashed: Array[Byte]): HLL = {
val (j, rhow) = jRhoW(hashed, bits)
SparseHLL(bits, Map(j -> Max(rhow)))
}
def createFromHashBytes(hashed: Array[Byte]): HLL =
SparseHLL(bits, Map(j(hashed, bits) -> Max(rhoW(hashed, bits))))

@deprecated("Use toHLL", since = "0.10.0 / 2015-05")
def batchCreate[T <% Array[Byte]](instances: Iterable[T]): HLL = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,72 @@ package com.twitter.algebird
* @return New HLLSeries
*/
case class HLLSeries(bits: Int, rows: Vector[Map[Int, Long]]) {

def insert(data: Array[Byte], timestamp: Long): HLLSeries = {
import HyperLogLog._
val hashed = hash(data)
val j0 = j(hashed, bits)
val rhow = rhoW(hashed, bits) - 1

val newRows = if (rhow < rows.size) {
val row = rows(rhow)
val t = row.get(j0) match {
case Some(t0) => Math.max(t0, timestamp)
case None => timestamp
}
rows.updated(rhow, row.updated(j0, t))
} else {
val padded = (rows.size until rhow).foldLeft(rows)((rs, _) => rs :+ Map.empty)
padded :+ Map(j0 -> timestamp)
}
HLLSeries(bits, newRows)
}

def maxRhowStats(threshold: Long): (Int, Double) = {
val seen = scala.collection.mutable.Set.empty[Int]
var sum: Double = 0.0
var i = rows.size - 1
while (i >= 0) {
val it = rows(i).iterator
while (it.hasNext) {
val (k, t) = it.next
if (t >= threshold && seen.add(k)) {
sum += HyperLogLog.negativePowersOfTwo(i + 1)
}
}
i -= 1
}
(seen.size, sum)
}

def approximateSizeSince(threshold: Long): Approximate[Long] = {
val (maxRhowSize, maxRhowSum) = maxRhowStats(threshold)
val size = 1 << bits
val zeroCnt = size - maxRhowSize
val z = 1.0 / (zeroCnt + maxRhowSum)
HyperLogLog.approximateSize(bits, size, zeroCnt, z)
}

/**
* @param since Timestamp from which to reconstruct the HLL
*
* @return New HLLSeries only including RhoWs for values seen at or after the given timestamp
*/
def since(threshold: Long) =
def since(threshold: Long): HLLSeries =
HLLSeries(
bits,
rows.map{ _.filter{ case (j, ts) => ts >= threshold } })

def toHLL: HLL =
if (rows.isEmpty)
SparseHLL(bits, Map())
else
rows.zipWithIndex.map{
def toHLL: HLL = {
val monoid = new HyperLogLogMonoid(bits)
if (rows.isEmpty) monoid.zero
else {
monoid.sum(rows.iterator.zipWithIndex.map {
case (map, i) =>
SparseHLL(bits, map.mapValues{ ts => Max((i + 1).toByte) }): HLL
}.reduce{ _ + _ }
SparseHLL(bits, map.mapValues { ts => Max((i + 1).toByte) })
})
}
}
}

/**
Expand All @@ -69,36 +117,44 @@ case class HLLSeries(bits: Int, rows: Vector[Map[Int, Long]]) {
class HyperLogLogSeriesMonoid(val bits: Int) extends Monoid[HLLSeries] {
import HyperLogLog._

val zero = HLLSeries(bits, Vector())
val zero: HLLSeries = HLLSeries(bits, Vector.empty)

def create(example: Array[Byte], timestamp: Long): HLLSeries = {
val hashed = hash(example)
val (j, rhow) = jRhoW(hashed, bits)

val vector = Vector.fill(rhow - 1){ Map[Int, Long]() } ++ Vector(Map(j -> timestamp))
HLLSeries(bits, vector)
val rhow = rhoW(hashed, bits)
val e = Map.empty[Int, Long]
val bldr = Vector.newBuilder[Map[Int, Long]]
var i = 1
while (i < rhow) { bldr += e; i += 1 }
bldr += Map(j(hashed, bits) -> timestamp)
HLLSeries(bits, bldr.result())
}

def plus(left: HLLSeries, right: HLLSeries): HLLSeries = {
if (left.rows.size > right.rows.size)
val ln = left.rows.size
val rn = right.rows.size
if (ln > rn) {
plus(right, left)
else {
val zipped = left.rows.zip(right.rows).map{
case (l, r) =>
combine(l, r)
}
HLLSeries(
bits,
zipped ++ right.rows.slice(left.rows.size, right.rows.size))
} else {
val bldr = Vector.newBuilder[Map[Int, Long]]
val lit = left.rows.iterator
val rit = right.rows.iterator
while (lit.hasNext && rit.hasNext) bldr += combine(lit.next, rit.next)
val zipped = bldr.result()
HLLSeries(bits, zipped ++ right.rows.slice(ln, rn))
}
}

private def combine(left: Map[Int, Long], right: Map[Int, Long]): Map[Int, Long] = {
if (left.size > right.size)
private def combine(left: Map[Int, Long], right: Map[Int, Long]): Map[Int, Long] =
if (left.size > right.size) {
combine(right, left)
else {
right ++
left.map{ case (k, v) => k -> (right.getOrElse(k, 0L).max(v)) }
} else {
left.foldLeft(right) {
case (m, (k, lv)) =>
m.updated(k, m.get(k) match {
case None => lv
case Some(rv) => Math.max(lv, rv)
})
}
}
}
}
Loading