Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trailing window aggregator #649

Merged
merged 14 commits into from
Feb 4, 2018
144 changes: 144 additions & 0 deletions algebird-core/src/main/scala/com/twitter/algebird/Window.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
Copyright 2018 Stripe

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.algebird

import scala.collection.immutable.Queue
import Operators._

/**
*
* Convenience case class defined with a monoid for aggregating elements over
* a finite window.
*
* @param total Known running total of `T`
* @param items queue of known trailing elements.
*
* Example usage:
*
* case class W28[T](window: Window[T]) {
* def total = this.window.total
* def items = this.window.items
* def size = this.window.size
* }
*
* object W28 {
* val windowSize = 28
* def apply[T](v: T): W28[T] = W28[T](Window(v))
*
* implicit def w28Monoid[T](implicit p: Priority[Group[T], Monoid[T]]): Monoid[W28[T]] =
* new Monoid[W28[T]] {
* private val WT: Monoid[Window[T]] = WindowMonoid[T](windowSize)
* def zero = W28[T](WT.zero)
* def plus(a: W28[T], b: W28[T]): W28[T] =
* W28[T](WT.plus(a.window, b.window))
* }
* }
* val elements = getElements()
*
* val trailing90Totals =
* elements
* .map{ W90 }
* .scanLeft(W90(0)) { (a, b) => a + b }
* .map{ _.total }
*/

case class Window[T](total: T, items: Queue[T]) {
def size = this.items.size
}

object Window {
def apply[T](v: T): Window[T] = Window[T](v, Queue[T](v))
def from[T](ts: Iterable[T])(implicit m: WindowMonoid[T]) = m.fromIterable(ts)
}

/**
* Provides a natural monoid for combining windows truncated to some window size.
*
* @param windowSize Upper limit of the number of items in a window.
*/


case class WindowMonoid[T](
windowSize: Int
)(implicit p: Priority[Group[T], Monoid[T]])
extends Monoid[Window[T]] {

require(windowSize >= 1, "Windows must have positive sizes")

def zero = p.fold(g => Window[T](g.zero, Queue.empty[T]))(m => Window(m.zero, Queue.empty[T]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this a val so we don't have to do the computation and allocation again on each call?


def plus(a: Window[T], b: Window[T]): Window[T] =
p.fold(g => plusG(a, b)(g))(m => plusM(a, b)(m))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for performance (not calling fold on each plus call) we are probably better to have two subclasses of WindowMonoid which could be an abstract class. We can decide once when we allocate which path to choose. Then the jit should be able to inline both instances. Sadly, the jvm can't optimize the above very well.


def plusG(a: Window[T], b: Window[T])(implicit g: Group[T]): Window[T] =
if (b.items.size >= windowSize) {
var total: T = b.total
var q = b.items
while (q.size > windowSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if its possible to do it in this implementation but it would be nice to generalize how elements are evicted from the window. Right now it seems like I can only do fixed windows but I would really like to do time based rolling windows as well. For example: Rolling window of 1 hour and we evict element based on time instead of size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was to eventually have two classes/monoids, based on whether we want aggregations over fixed items or fixed time.

// Monoid defines aggregation for fixed window length, e.g. last 28 elements. Queue fixed to 28.
case class Window[T](total:T, q: Queue[T])

// Monoid defines aggregation for fixed time, e.g. last 28 days. Queue can grow arbitrarily.
case class WindowWithTs(lastestTime: Double, total: T, q: Queue[(Double, T)])

Does that seem sensible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable but I am still not sure why we cant use ProrityQueue instead of Queue? As a user I would like to control the ordering and if I am passing you a PQ, I can do that by specifying my own comparator

total = total - q.head
q = q.tail
}
Window(total, q)
} else {
// we need windowSize - b.items.size from `a`
var truncA = a.items
var totalA = a.total
val truncTo = windowSize - b.size
while (truncA.size > truncTo) {
totalA = totalA - truncA.head
truncA = truncA.tail
}
val items = truncA ++ b.items
val total = totalA + b.total
Window(total, items)
}

def plusM(a: Window[T], b: Window[T])(implicit m: Monoid[T]): Window[T] =
if (b.items.size >= windowSize) {
val items = b.items.takeRight(windowSize)
val total = m.sum(items)
Window(total, items)
} else {
// we need windowSize - b.items.size from `a`
val fromA = a.items.takeRight(windowSize - b.items.size)
val items = fromA ++ b.items
val total = m.sum(fromA) + b.total
Window(total, items)
}

override def sumOption(ws: TraversableOnce[Window[T]]): Option[Window[T]] =
if (ws.isEmpty) None
else {
val it = ws.toIterator
var queue = Queue.empty[T]
while (it.hasNext) {
queue = (queue ++ it.next.items).takeRight(windowSize)
}
val monT: Monoid[T] = p.join
Some(Window(monT.sum(queue), queue))
}

def fromIterable(ts: Iterable[T]): Window[T] = {
if(ts.size == 0) zero
else {
val monT: Monoid[T] = p.join
val right = ts.toList.takeRight(windowSize)
val total = monT.sum(right)
Window(total, Queue(right: _*))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.twitter.algebird

import scala.collection.immutable.Queue

import com.twitter.algebird.BaseProperties._
import com.twitter.algebird.scalacheck.arbitrary._
import com.twitter.algebird.scalacheck.PosNum
import org.scalacheck.Arbitrary
import org.scalacheck.Gen
import org.scalacheck.Prop.forAll

class WindowLaws extends CheckProperties {

implicit val mon = WindowMonoid[Int](5)
implicit val wGen = Arbitrary {
for (
v <- Gen.choose(-1000, 1000)
) yield (Window[Int](v))
}

property("Window obeys monoid laws") { monoidLaws[Window[Int]] }
}

class WindowTest extends CheckProperties {
property("We aggregate over only n items") {
forAll { (ts0: List[Int], pn: PosNum[Int]) =>
val n = pn.value
val ts = ts0.takeRight(n)
val mon = WindowMonoid[Int](n)
mon.sum(ts0.map(Window(_))).total == ts.sum
}
}

property("We correctly create a window from iterable") {
forAll { (ts0: List[Int], pn: PosNum[Int]) =>
val n = pn.value
val mon = WindowMonoid[Int](n)
val right = Queue(ts0.takeRight(n): _*)
val expected = Window(right.sum, right)
val got = mon.fromIterable(ts0)
got == expected
}
}

property("We correctly combine windows") {
forAll { (left: List[Int], right: List[Int], pn: PosNum[Int]) =>
val n = pn.value
val mon = WindowMonoid[Int](n)
val trunc = Queue((left ::: right).takeRight(n): _*)
val expected = Window(trunc.sum, trunc)
val got = mon.plus(mon.fromIterable(left), mon.fromIterable(right))
expected == got
}
}

property("We correctly overrode sumOption") {
forAll { (ts0: List[Int], pn: PosNum[Int]) =>
val n = pn.value
val mon = WindowMonoid[Int](n)
val got = mon.sumOption(ts0.map { Window(_) })
val trunc = Queue(ts0.takeRight(n): _*)
val expected = if (ts0.size == 0) None else Some(Window(trunc.sum, trunc))
expected == got
}
}
}