-
Notifications
You must be signed in to change notification settings - Fork 347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add trailing window aggregator #649
Changes from all commits
0d0103e
a3b85cc
622fddf
afd2f07
1a1b82c
387d875
7ea0e79
096ba21
9203860
6d5a948
d69f141
1277bd6
16aab4c
5321cfa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* | ||
Copyright 2018 Stripe | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package com.twitter.algebird | ||
|
||
import scala.collection.immutable.Queue | ||
import Operators._ | ||
|
||
/** | ||
* | ||
* Convenience case class defined with a monoid for aggregating elements over | ||
* a finite window. | ||
* | ||
* @param total Known running total of `T` | ||
* @param items queue of known trailing elements. | ||
* | ||
* Example usage: | ||
* | ||
* case class W28[T](window: Window[T]) { | ||
* def total = this.window.total | ||
* def items = this.window.items | ||
* def size = this.window.size | ||
* } | ||
* | ||
* object W28 { | ||
* val windowSize = 28 | ||
* def apply[T](v: T): W28[T] = W28[T](Window(v)) | ||
* | ||
* implicit def w28Monoid[T](implicit p: Priority[Group[T], Monoid[T]]): Monoid[W28[T]] = | ||
* new Monoid[W28[T]] { | ||
* private val WT: Monoid[Window[T]] = WindowMonoid[T](windowSize) | ||
* def zero = W28[T](WT.zero) | ||
* def plus(a: W28[T], b: W28[T]): W28[T] = | ||
* W28[T](WT.plus(a.window, b.window)) | ||
* } | ||
* } | ||
* val elements = getElements() | ||
* | ||
* val trailing90Totals = | ||
* elements | ||
* .map{ W90 } | ||
* .scanLeft(W90(0)) { (a, b) => a + b } | ||
* .map{ _.total } | ||
*/ | ||
|
||
case class Window[T](total: T, items: Queue[T]) { | ||
def size = this.items.size | ||
} | ||
|
||
object Window { | ||
def apply[T](v: T): Window[T] = Window[T](v, Queue[T](v)) | ||
def from[T](ts: Iterable[T])(implicit m: WindowMonoid[T]) = m.fromIterable(ts) | ||
} | ||
|
||
/** | ||
* Provides a natural monoid for combining windows truncated to some window size. | ||
* | ||
* @param windowSize Upper limit of the number of items in a window. | ||
*/ | ||
|
||
|
||
case class WindowMonoid[T]( | ||
windowSize: Int | ||
)(implicit p: Priority[Group[T], Monoid[T]]) | ||
extends Monoid[Window[T]] { | ||
|
||
require(windowSize >= 1, "Windows must have positive sizes") | ||
|
||
def zero = p.fold(g => Window[T](g.zero, Queue.empty[T]))(m => Window(m.zero, Queue.empty[T])) | ||
|
||
def plus(a: Window[T], b: Window[T]): Window[T] = | ||
p.fold(g => plusG(a, b)(g))(m => plusM(a, b)(m)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for performance (not calling fold on each plus call) we are probably better to have two subclasses of |
||
|
||
def plusG(a: Window[T], b: Window[T])(implicit g: Group[T]): Window[T] = | ||
if (b.items.size >= windowSize) { | ||
var total: T = b.total | ||
var q = b.items | ||
while (q.size > windowSize) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if its possible to do it in this implementation but it would be nice to generalize how elements are evicted from the window. Right now it seems like I can only do fixed windows but I would really like to do time based rolling windows as well. For example: Rolling window of 1 hour and we evict element based on time instead of size There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My thought was to eventually have two classes/monoids, based on whether we want aggregations over fixed items or fixed time.
Does that seem sensible? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That seems reasonable but I am still not sure why we cant use |
||
total = total - q.head | ||
q = q.tail | ||
} | ||
Window(total, q) | ||
} else { | ||
// we need windowSize - b.items.size from `a` | ||
var truncA = a.items | ||
var totalA = a.total | ||
val truncTo = windowSize - b.size | ||
while (truncA.size > truncTo) { | ||
totalA = totalA - truncA.head | ||
truncA = truncA.tail | ||
} | ||
val items = truncA ++ b.items | ||
val total = totalA + b.total | ||
Window(total, items) | ||
} | ||
|
||
def plusM(a: Window[T], b: Window[T])(implicit m: Monoid[T]): Window[T] = | ||
if (b.items.size >= windowSize) { | ||
val items = b.items.takeRight(windowSize) | ||
val total = m.sum(items) | ||
Window(total, items) | ||
} else { | ||
// we need windowSize - b.items.size from `a` | ||
val fromA = a.items.takeRight(windowSize - b.items.size) | ||
val items = fromA ++ b.items | ||
val total = m.sum(fromA) + b.total | ||
Window(total, items) | ||
} | ||
|
||
override def sumOption(ws: TraversableOnce[Window[T]]): Option[Window[T]] = | ||
if (ws.isEmpty) None | ||
else { | ||
val it = ws.toIterator | ||
var queue = Queue.empty[T] | ||
while (it.hasNext) { | ||
queue = (queue ++ it.next.items).takeRight(windowSize) | ||
} | ||
val monT: Monoid[T] = p.join | ||
Some(Window(monT.sum(queue), queue)) | ||
} | ||
|
||
def fromIterable(ts: Iterable[T]): Window[T] = { | ||
if(ts.size == 0) zero | ||
else { | ||
val monT: Monoid[T] = p.join | ||
val right = ts.toList.takeRight(windowSize) | ||
val total = monT.sum(right) | ||
Window(total, Queue(right: _*)) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package com.twitter.algebird | ||
|
||
import scala.collection.immutable.Queue | ||
|
||
import com.twitter.algebird.BaseProperties._ | ||
import com.twitter.algebird.scalacheck.arbitrary._ | ||
import com.twitter.algebird.scalacheck.PosNum | ||
import org.scalacheck.Arbitrary | ||
import org.scalacheck.Gen | ||
import org.scalacheck.Prop.forAll | ||
|
||
class WindowLaws extends CheckProperties { | ||
|
||
implicit val mon = WindowMonoid[Int](5) | ||
implicit val wGen = Arbitrary { | ||
for ( | ||
v <- Gen.choose(-1000, 1000) | ||
) yield (Window[Int](v)) | ||
} | ||
|
||
property("Window obeys monoid laws") { monoidLaws[Window[Int]] } | ||
} | ||
|
||
class WindowTest extends CheckProperties { | ||
property("We aggregate over only n items") { | ||
forAll { (ts0: List[Int], pn: PosNum[Int]) => | ||
val n = pn.value | ||
val ts = ts0.takeRight(n) | ||
val mon = WindowMonoid[Int](n) | ||
mon.sum(ts0.map(Window(_))).total == ts.sum | ||
} | ||
} | ||
|
||
property("We correctly create a window from iterable") { | ||
forAll { (ts0: List[Int], pn: PosNum[Int]) => | ||
val n = pn.value | ||
val mon = WindowMonoid[Int](n) | ||
val right = Queue(ts0.takeRight(n): _*) | ||
val expected = Window(right.sum, right) | ||
val got = mon.fromIterable(ts0) | ||
got == expected | ||
} | ||
} | ||
|
||
property("We correctly combine windows") { | ||
forAll { (left: List[Int], right: List[Int], pn: PosNum[Int]) => | ||
val n = pn.value | ||
val mon = WindowMonoid[Int](n) | ||
val trunc = Queue((left ::: right).takeRight(n): _*) | ||
val expected = Window(trunc.sum, trunc) | ||
val got = mon.plus(mon.fromIterable(left), mon.fromIterable(right)) | ||
expected == got | ||
} | ||
} | ||
|
||
property("We correctly overrode sumOption") { | ||
forAll { (ts0: List[Int], pn: PosNum[Int]) => | ||
val n = pn.value | ||
val mon = WindowMonoid[Int](n) | ||
val got = mon.sumOption(ts0.map { Window(_) }) | ||
val trunc = Queue(ts0.takeRight(n): _*) | ||
val expected = if (ts0.size == 0) None else Some(Window(trunc.sum, trunc)) | ||
expected == got | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this a
val
so we don't have to do the computation and allocation again on each call?