-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add trailing window aggregator #649
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great.
Look in algebird-core/src/test/... to find the tests for this module. We want to make sure it passes the monoidLaws. You can search for monoidLaw for some examples.
After we add test coverage we should merge this.
case class WindowMonoid[T]( | ||
windowSize: Int | ||
)( | ||
implicit m: Monoid[T], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let’s not use this m. You can get the zero from the p by case matching.
* @param q (optional) queue to override default q containing input "total" | ||
*/ | ||
|
||
case class Window[T](total: T, q: Queue[T] = Queue()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! What do you think about allowing user to pass a PriorityQueue
instead of just a Queue (AFAIK scala PQ doesnt extend Queue). The way code is written right now, its implying that I need a commutative monoid to guarantee deterministic behavior or I need to control the order in which items are inserted in the window. When I work with windows they usually have some sort of ordering defined (last 3 distinct IPs used by a user in last one hour for example), It would be nice to allow users to preserve the ordering somehow IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, I think Window[T](total: T, items: Queue[T])
is better. We don't really want people to do something like Window(100)
do we? that will have no items in the queue, and I imagine break the laws.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at items
, it fills in the queue if none was provided, i.e meant for initialization. The intent is to have a simple interface where you can wrap a window around a single object and sum in a sensible way:
implicit def wm[T] = WindowMonoid[T](2)
Window(1) + Window(2) + Window(3) == Window(5)
Needing to specify the queue would make this redundant
Window(1, Queue(1)) + ...
Tbh I'm not a big fan of even having to expose the queue, since you could change the implementation in the future (e.g. a PriorityQueue). A more general approach might use a PushPopper
interface, but that's probably overkill.
if (b.items.size >= windowSize) { | ||
var total: T = b.total | ||
var q = b.items | ||
while (q.size > windowSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if its possible to do it in this implementation but it would be nice to generalize how elements are evicted from the window. Right now it seems like I can only do fixed windows but I would really like to do time based rolling windows as well. For example: Rolling window of 1 hour and we evict element based on time instead of size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought was to eventually have two classes/monoids, based on whether we want aggregations over fixed items or fixed time.
// Monoid defines aggregation for fixed window length, e.g. last 28 elements. Queue fixed to 28.
case class Window[T](total:T, q: Queue[T])
// Monoid defines aggregation for fixed time, e.g. last 28 days. Queue can grow arbitrarily.
case class WindowWithTs(lastestTime: Double, total: T, q: Queue[(Double, T)])
Does that seem sensible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems reasonable but I am still not sure why we cant use ProrityQueue
instead of Queue
? As a user I would like to control the ordering and if I am passing you a PQ, I can do that by specifying my own comparator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in addition to the monoidLaws, I think we want something like:
forAll { (ts0: List[Int], n: Int) =>
val ts = ts0.takeRight(n)
val mon = new WindowMonoid(n)
assert(mon.sum(ts.map(Window(_))).total == ts.sum)
}
That's the behavior you want, right?
* @param q (optional) queue to override default q containing input "total" | ||
*/ | ||
|
||
case class Window[T](total: T, q: Queue[T] = Queue()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, I think Window[T](total: T, items: Queue[T])
is better. We don't really want people to do something like Window(100)
do we? that will have no items in the queue, and I imagine break the laws.
@MansurAshraf: I'm trying to grasp whether we need a fully general implementation up front, where the user inputs their own queue/priority. As mentioned above my thoughts are to have trailing windows based on one of:
The "last N ip-addresses" problem is interesting. But rather than making this implementation more flexible/abstract, it might make sense to have a |
} | ||
|
||
/* | ||
Example usage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make this a scaladoc comment on the case class?
val ts = ts0.takeRight(n) | ||
val mon = new WindowMonoid(n) | ||
assert(mon.sum(ts0.map( Window(_) )).total == ts.sum) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have a second one:
forAll { (ts0: List[Int], ts1: List[Int], n: Int) =>
val expected = Queue((ts0 ::: ts1).takeRight(n): _*)
val mon = new WindowMonoid(n)
val got = mon.plus(Window(ts0.sum, Queue(ts0: _*)), Window(ts1.sum, Queue(ts1:_*)))
assert(got == expected)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
/** | ||
* Provides a natural monoid for combining windows truncated to some window size. | ||
* | ||
* @param windowSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add more comment to this javaDoc param?
case Priority.Preferred(g) => Window(g.zero) | ||
case Priority.Fallback(m) => Window(m.zero) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about a
def fromIterable[T](ts: Iterable[T]): Window[T] = {
val monT: Monoid[T] = p.join
val right = ts.toList.takeRight(windowSize)
val total = monT.sum(right)
Window(total, Queue.empty[T] ++ right)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnynek: I did this, and generalized to Traversable
.
case Priority.Preferred(g) => plusG(a, b)(g) | ||
case Priority.Fallback(m) => plusM(a, b)(m) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can optimize sumOption
somewhat here: you can take blocks of data and ignore the left if the right is full. PS: scalding uses this if it exists, so overridding sumOption on Semigroup (or any subclass) can be a very nice performance win on map/reduce.
def sumOption(ws: TraversableOnce[Window[T]]): Option[Window[T]] =
if (ws.isEmpty) None
else {
val it = ws.toIterator
var size = 0
var queue = Queue.empty[Window[T]]
while (it.hasNext) {
val n = it.next
queue = queue :+ n
size += n.size
val tailSize = size - queue.head.size
if (tailSize >= windowSize) {
queue = queue.tail
size = tailSize
}
}
// now we only have to merge the queue:
queue.tail.foldLeft(queue.head)(plus(_, _))
}
I think this can be a pretty big win.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnynek: Overwrote sumOption below.
this PR: #650 seems to fix the ruby issue on travis. |
(nothing like non-hermetic builds to make you feel real good, all day long). |
if(ws.isEmpty) None | ||
else { | ||
val it = ws.toIterator | ||
var queue = Queue[T]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.empty is more efficient since you avoid allocating the varargs wrapper.
while (it.hasNext) { | ||
queue = (queue ++ it.next.items).takeRight(windowSize) | ||
} | ||
Some(fromTraversable(queue)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using fromTraversable here is ignoring that you already have a queue. I think summing the queue and reusing it will be a significant performance improvement
|
||
object Window { | ||
def apply[T](v: T): Window[T] = Window[T](v, Queue[T](v)) | ||
def from[T](ts: Traversable[T])(implicit m: WindowMonoid[T]) = m.fromTraversable(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Iterable? Scala 2.13 is changing the collections to avoid Traversable: https://www.scala-lang.org/blog/2017/02/28/collections-rework.html#traversable-and-iterable
Codecov Report
@@ Coverage Diff @@
## develop #649 +/- ##
===========================================
- Coverage 82.83% 82.81% -0.03%
===========================================
Files 108 109 +1
Lines 5163 5209 +46
Branches 314 316 +2
===========================================
+ Hits 4277 4314 +37
- Misses 886 895 +9
Continue to review full report at Codecov.
|
Added more tests. Caught that the zero element should have an empty queue, where originally it was Also what are you thoughts about Scala 2.13, when we can have a signature |
yeah, I think in scala 2.13 we will have to revisit a lot of stuff, but the compatibility story is open since we will need to support 2.12 for a while, I guess (we aren't even on 2.12 yet at Stripe). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two comments, but let's address them in a follow up PR.
Thanks for this!
|
||
require(windowSize >= 1, "Windows must have positive sizes") | ||
|
||
def zero = p.fold(g => Window[T](g.zero, Queue.empty[T]))(m => Window(m.zero, Queue.empty[T])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this a val
so we don't have to do the computation and allocation again on each call?
def zero = p.fold(g => Window[T](g.zero, Queue.empty[T]))(m => Window(m.zero, Queue.empty[T])) | ||
|
||
def plus(a: Window[T], b: Window[T]): Window[T] = | ||
p.fold(g => plusG(a, b)(g))(m => plusM(a, b)(m)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for performance (not calling fold on each plus call) we are probably better to have two subclasses of WindowMonoid
which could be an abstract class. We can decide once when we allocate which path to choose. Then the jit should be able to inline both instances. Sadly, the jvm can't optimize the above very well.
published in 0.13.4: |
@MansurAshraf note if you want the most recent K things, TopKMonoid should work: (so would priorityqueue I guess). |
Description
Add WindowMonoid utility for aggregating over finite window. Window case class stores elements with a queue. The monoid combines windows while trimming their elements to the appropriate size. The monoid is also geared to aggregate elements with the group property more efficiently, as we can subtract from the total elements we push out of the queue upon aggregation.
Testing
Unit tests
r? @johnynek