Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding scoped top-N CMS monoid #471

Merged
merged 1 commit into from
Aug 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 122 additions & 77 deletions algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,47 @@ case class TopCMSInstance[K](override val cms: CMS[K], hhs: HeavyHitters[K], par

}

class TopCMSMonoid[K](cms: CMS[K], logic: HeavyHittersLogic[K]) extends Monoid[TopCMS[K]] {

val params: TopCMSParams[K] = TopCMSParams(logic)

val zero: TopCMS[K] = TopCMSZero[K](cms, params)

/**
* Combines the two sketches.
*
* The sketches must use the same hash functions.
*/
def plus(left: TopCMS[K], right: TopCMS[K]): TopCMS[K] = {
require(left.cms.params.hashes == right.cms.params.hashes, "The sketches must use the same hash functions.")
left ++ right
}

/**
* Creates a sketch out of a single item.
*/
def create(item: K): TopCMS[K] = TopCMSItem[K](item, cms + item, params)

/**
* Creates a sketch out of multiple items.
*/
def create(data: Seq[K]): TopCMS[K] = {
data.foldLeft(zero) { case (acc, x) => plus(acc, create(x)) }
}

}

class TopCMSAggregator[K](cmsMonoid: TopCMSMonoid[K])
extends MonoidAggregator[K, TopCMS[K], TopCMS[K]] {

def monoid = cmsMonoid

def prepare(value: K): TopCMS[K] = monoid.create(value)

def present(cms: TopCMS[K]): TopCMS[K] = cms

}

/**
* Controls how a CMS that implements [[CMSHeavyHitters]] tracks heavy hitters.
*/
Expand Down Expand Up @@ -851,38 +892,7 @@ case class HeavyHitter[K](item: K, count: Long) extends java.io.Serializable
* Which type K should you pick in practice? For domains that have less than `2^64` unique elements, you'd
* typically use [[Long]]. For larger domains you can try [[BigInt]], for example.
*/
class TopPctCMSMonoid[K](cms: CMS[K], heavyHittersPct: Double = 0.01) extends Monoid[TopCMS[K]] {

val params: TopCMSParams[K] = {
val logic = new TopPctLogic[K](heavyHittersPct)
TopCMSParams[K](logic)
}

val zero: TopCMS[K] = TopCMSZero[K](cms, params)

/**
* Combines the two sketches.
*
* The sketches must use the same hash functions.
*/
def plus(left: TopCMS[K], right: TopCMS[K]): TopCMS[K] = {
require(left.cms.params.hashes == right.cms.params.hashes, "The sketches must use the same hash functions.")
left ++ right
}

/**
* Creates a sketch out of a single item.
*/
def create(item: K): TopCMS[K] = TopCMSItem[K](item, cms + item, params)

/**
* Creates a sketch out of multiple items.
*/
def create(data: Seq[K]): TopCMS[K] = {
data.foldLeft(zero) { case (acc, x) => plus(acc, create(x)) }
}

}
class TopPctCMSMonoid[K](cms: CMS[K], heavyHittersPct: Double = 0.01) extends TopCMSMonoid[K](cms, TopPctLogic[K](heavyHittersPct))

object TopPctCMS {

Expand Down Expand Up @@ -915,16 +925,7 @@ object TopPctCMS {
/**
* An Aggregator for [[TopPctCMS]]. Can be created using [[TopPctCMS.aggregator]].
*/
case class TopPctCMSAggregator[K](cmsMonoid: TopPctCMSMonoid[K])
extends MonoidAggregator[K, TopCMS[K], TopCMS[K]] {

def monoid = cmsMonoid

def prepare(value: K): TopCMS[K] = monoid.create(value)

def present(cms: TopCMS[K]): TopCMS[K] = cms

}
case class TopPctCMSAggregator[K](cmsMonoid: TopPctCMSMonoid[K]) extends TopCMSAggregator(cmsMonoid)

/**
* Monoid for top-N based [[TopCMS]] sketches. '''Use with care! (see warning below)'''
Expand Down Expand Up @@ -983,36 +984,7 @@ case class TopPctCMSAggregator[K](cmsMonoid: TopPctCMSMonoid[K])
* Which type K should you pick in practice? For domains that have less than `2^64` unique elements, you'd
* typically use [[Long]]. For larger domains you can try [[BigInt]], for example.
*/
class TopNCMSMonoid[K](cms: CMS[K], heavyHittersN: Int = 100) extends Monoid[TopCMS[K]] {

val params: TopCMSParams[K] = {
val logic = new TopNLogic[K](heavyHittersN)
TopCMSParams[K](logic)
}

val zero: TopCMS[K] = TopCMSZero[K](cms, params)

/**
* Combines the two sketches.
*
* The sketches must use the same hash functions.
*/
def plus(left: TopCMS[K], right: TopCMS[K]): TopCMS[K] = {
require(left.cms.params.hashes == right.cms.params.hashes, "The sketches must use the same hash functions.")
left ++ right
}

/**
* Creates a sketch out of a single item.
*/
def create(item: K): TopCMS[K] = TopCMSItem[K](item, cms + item, params)

/**
* Creates a sketch out of multiple items.
*/
def create(data: Seq[K]): TopCMS[K] = data.foldLeft(zero) { case (acc, x) => plus(acc, create(x)) }

}
class TopNCMSMonoid[K](cms: CMS[K], heavyHittersN: Int = 100) extends TopCMSMonoid[K](cms, TopNLogic[K](heavyHittersN))

object TopNCMS {

Expand Down Expand Up @@ -1045,14 +1017,87 @@ object TopNCMS {
/**
* An Aggregator for [[TopNCMS]]. Can be created using [[TopNCMS.aggregator]].
*/
case class TopNCMSAggregator[K](cmsMonoid: TopNCMSMonoid[K])
extends MonoidAggregator[K, TopCMS[K], TopCMS[K]] {
case class TopNCMSAggregator[K](cmsMonoid: TopNCMSMonoid[K]) extends TopCMSAggregator(cmsMonoid)

val monoid = cmsMonoid
/**
* K1 defines a scope for the CMS. For each k1, keep the top heavyHittersN
* associated k2 values.
*/
case class ScopedTopNLogic[K1, K2](heavyHittersN: Int) extends HeavyHittersLogic[(K1, K2)] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment to this logic about what is going on? I think for each K1, we keep the top heavyHittersN.


def prepare(value: K): TopCMS[K] = monoid.create(value)
require(heavyHittersN > 0, "heavyHittersN must be > 0")

def present(cms: TopCMS[K]): TopCMS[K] = cms
override def purgeHeavyHitters(cms: CMS[(K1, K2)])(hitters: HeavyHitters[(K1, K2)]): HeavyHitters[(K1, K2)] = {
val grouped = hitters.hhs.groupBy { hh => hh.item._1 }
val (underLimit, overLimit) = grouped.partition { _._2.size <= heavyHittersN }
val sorted = overLimit.mapValues { hhs => hhs.toSeq.sortBy { hh => hh.count } }
val purged = sorted.mapValues { hhs => hhs.takeRight(heavyHittersN) }
HeavyHitters[(K1, K2)](purged.values.flatten.toSet ++ underLimit.values.flatten.toSet)
}

}

/*
* Monoid for Top-N values per key in an associative [[TopCMS]].
*
* Typical use case for this might be (Country, City) pairs. For a stream of such
* pairs, we might want to keep track of the most popular cities for each country.
*
* This can, of course, be achieved using a Map[Country, TopNCMS[City]], but this
* requires storing one CMS per distinct Country.
*
* Similarly, one could attempt to use a TopNCMS[(Country, City)], but less common
* countries may not make the cut if N is not "very large".
*
* ScopedTopNCMSMonoid[Country, City] will avoid having one Country drown others
* out, while still only using a single CMS.
*
* In general the eviction of K1 is not supported, and all distinct K1 values must
* be retained. Therefore it is important to only use this Monoid when the number
* of distinct K1 values is known to be reasonably bounded.
*/
class ScopedTopNCMSMonoid[K1, K2](cms: CMS[(K1, K2)], heavyHittersN: Int = 100) extends TopCMSMonoid[(K1, K2)](cms, ScopedTopNLogic[K1, K2](heavyHittersN))

object ScopedTopNCMS {

def scopedHasher[K1: CMSHasher, K2: CMSHasher] = new CMSHasher[(K1, K2)] {
private val k1Hasher = implicitly[CMSHasher[K1]]
private val k2Hasher = implicitly[CMSHasher[K2]]

def hash(a: Int, b: Int, width: Int)(x: (K1, K2)): Int = {
val (k1, k2) = x
val xs = Seq(
k1Hasher.hash(a, b, width)(k1),
k2Hasher.hash(a, b, width)(k2),
a,
b)
(scala.util.hashing.MurmurHash3.seqHash(xs) & Int.MaxValue) % width
}
}

def monoid[K1: CMSHasher, K2: CMSHasher](eps: Double,
delta: Double,
seed: Int,
heavyHittersN: Int): ScopedTopNCMSMonoid[K1, K2] =
new ScopedTopNCMSMonoid[K1, K2](CMS(eps, delta, seed)(scopedHasher[K1, K2]), heavyHittersN)

def monoid[K1: CMSHasher, K2: CMSHasher](depth: Int,
width: Int,
seed: Int,
heavyHittersN: Int): ScopedTopNCMSMonoid[K1, K2] =
monoid(CMSFunctions.eps(width), CMSFunctions.delta(depth), seed, heavyHittersN)

def aggregator[K1: CMSHasher, K2: CMSHasher](eps: Double,
delta: Double,
seed: Int,
heavyHittersN: Int): TopCMSAggregator[(K1, K2)] =
new TopCMSAggregator(monoid(eps, delta, seed, heavyHittersN))

def aggregator[K1: CMSHasher, K2: CMSHasher](depth: Int,
width: Int,
seed: Int,
heavyHittersN: Int): TopCMSAggregator[(K1, K2)] =
aggregator(CMSFunctions.eps(width), CMSFunctions.delta(depth), seed, heavyHittersN)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,64 @@ abstract class CMSTest[K: CMSHasher: FromIntLike] extends WordSpec with Matchers

}

"A Scoped Top-N Count-Min sketch implementing CMSHeavyHitters" should {

"create correct sketches out of a single item" in {
forAll{ (x: Int, y: Int) =>
val data = (x, y).toK[K]
val cmsMonoid = {
val heavyHittersN = 2
ScopedTopNCMS.monoid[K, K](EPS, DELTA, SEED, heavyHittersN)
}
val topCms = cmsMonoid.create(data)
topCms.totalCount should be(1)
topCms.cms.totalCount should be(1)
topCms.frequency((x, y).toK[K]).estimate should be(1)
// Poor man's way to come up with an item that is not x and that is very unlikely to hash to the same slot.
val otherItem = (x + 1, y)
topCms.frequency(otherItem.toK[K]).estimate should be(0)
// The following assert indirectly verifies whether the counting table is not all-zero (cf. GH-393).
topCms.innerProduct(topCms).estimate should be(1)
}
}

"(when adding CMS instances) keep all heavy hitters keys" in {
val heavyHittersN = 1
val monoid = ScopedTopNCMS.monoid[K, K](EPS, DELTA, SEED, heavyHittersN)
val cms1 = monoid.create(Seq((1, 1), (2, 3), (2, 3)).toK[K])
cms1.heavyHitters should be(Set((1, 1), (2, 3)).toK[K])
val cms2 = cms1 ++ monoid.create(Seq((3, 8), (3, 8), (3, 8)).toK[K])
cms2.heavyHitters should be(Set((1, 1), (2, 3), (3, 8)).toK[K])
val cms3 = cms2 ++ monoid.create(Seq((1, 1), (1, 1), (1, 1)).toK[K])
cms3.heavyHitters should be(Set((1, 1), (2, 3), (3, 8)).toK[K])
val cms4 = cms3 ++ monoid.create(Seq((6, 2), (6, 2), (6, 2), (6, 2), (6, 2), (6, 2)).toK[K])
cms4.heavyHitters should be(Set((1, 1), (2, 3), (3, 8), (6, 2)).toK[K])
}

"(when adding CMS instances) drop old heavy hitters for the same key when new heavy hitters replace them" in {
val heavyHittersN = 2
val monoid = ScopedTopNCMS.monoid[K, K](EPS, DELTA, SEED, heavyHittersN)
val cms1 = monoid.create(Seq((4, 1), (4, 2), (4, 2)).toK[K])
cms1.heavyHitters should be(Set((4, 1), (4, 2)).toK[K])
val cms2 = cms1 ++ monoid.create(Seq((4, 3), (4, 3), (4, 3)).toK[K])
cms2.heavyHitters should be(Set((4, 2), (4, 3)).toK[K])
val cms3 = cms2 ++ monoid.create(Seq((4, 1), (4, 1), (4, 1)).toK[K])
cms3.heavyHitters should be(Set((4, 3), (4, 1)).toK[K])
val cms4 = cms3 ++ monoid.create(Seq((4, 6), (4, 6), (4, 6), (4, 6), (4, 6), (4, 6)).toK[K])
cms4.heavyHitters should be(Set((4, 1), (4, 6)).toK[K])
}

"trim multiple keys at once" in {
val heavyHittersN = 2
val data =
Seq(1, 2, 2, 3, 3, 3, 6, 6, 6, 6, 6, 6).flatMap { i => Seq((4, i), (7, i + 2)) }.toK[K]
val monoid = ScopedTopNCMS.monoid[K, K](EPS, DELTA, SEED, heavyHittersN)
val cms = monoid.create(data)
cms.heavyHitters should be(Set((4, 3), (4, 6), (7, 5), (7, 8)).toK[K])
}

}

}

class CMSFunctionsSpec extends PropSpec with PropertyChecks with Matchers {
Expand Down Expand Up @@ -904,11 +962,26 @@ class CMSHasherBytesSpec extends CMSHasherSpec[Bytes]

abstract class CMSHasherSpec[K: CMSHasher: FromIntLike] extends PropSpec with PropertyChecks with Matchers {

property("returns positive hashes (i.e. slots) only") {
property("returns hashes (i.e. slots) in the range [0, width)") {
forAll { (a: Int, b: Int, width: Int, x: Int) =>
whenever (width > 0) {
val hash = CMSHash[K](a, b, width)
hash(x.toK[K]) should be >= 0
val hashValue = hash(x.toK[K])

hashValue should be >= 0
hashValue should be < width
}
}
}

property("returns scoped hashes in the range [0, width)") {
forAll { (a: Int, b: Int, width: Int, x: Int, y: Int) =>
whenever (width > 0) {
val hasher = ScopedTopNCMS.scopedHasher[K, K]
val hashValue = hasher.hash(a, b, width)((x, y).toK[K])

hashValue should be >= 0
hashValue should be < width
}
}
}
Expand Down Expand Up @@ -964,12 +1037,24 @@ object CmsTestImplicits {
def toK[T: FromIntLike]: T = implicitly[FromIntLike[T]].fromInt(x)
}

implicit class PairCast(x: (Int, Int)) {
def toK[T: FromIntLike]: (T, T) = (x._1.toK[T], x._2.toK[T])
}

implicit class SeqCast(xs: Seq[Int]) {
def toK[T: FromIntLike]: Seq[T] = xs map { _.toK[T] }
}

implicit class PairSeqCast(xs: Seq[(Int, Int)]) {
def toK[T: FromIntLike]: Seq[(T, T)] = xs map { _.toK[T] }
}

implicit class SetCast(xs: Set[Int]) {
def toK[T: FromIntLike]: Set[T] = xs map { _.toK[T] }
}

implicit class PairSetCast(xs: Set[(Int, Int)]) {
def toK[T: FromIntLike]: Set[(T, T)] = xs map { _.toK[T] }
}

}