Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trailing window aggregator #649

Merged
merged 14 commits into from
Feb 4, 2018
Merged

Add trailing window aggregator #649

merged 14 commits into from
Feb 4, 2018

Conversation

cdg-stripe
Copy link
Contributor

@cdg-stripe cdg-stripe commented Jan 26, 2018

Description

Add WindowMonoid utility for aggregating over finite window. Window case class stores elements with a queue. The monoid combines windows while trimming their elements to the appropriate size. The monoid is also geared to aggregate elements with the group property more efficiently, as we can subtract from the total elements we push out of the queue upon aggregation.

Testing

Unit tests

r? @johnynek

@CLAassistant
Copy link

CLAassistant commented Jan 26, 2018

CLA assistant check
All committers have signed the CLA.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great.

Look in algebird-core/src/test/... to find the tests for this module. We want to make sure it passes the monoidLaws. You can search for monoidLaw for some examples.

After we add test coverage we should merge this.

case class WindowMonoid[T](
windowSize: Int
)(
implicit m: Monoid[T],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s not use this m. You can get the zero from the p by case matching.

* @param q (optional) queue to override default q containing input "total"
*/

case class Window[T](total: T, q: Queue[T] = Queue()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! What do you think about allowing user to pass a PriorityQueue instead of just a Queue (AFAIK scala PQ doesnt extend Queue). The way code is written right now, its implying that I need a commutative monoid to guarantee deterministic behavior or I need to control the order in which items are inserted in the window. When I work with windows they usually have some sort of ordering defined (last 3 distinct IPs used by a user in last one hour for example), It would be nice to allow users to preserve the ordering somehow IMO

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think Window[T](total: T, items: Queue[T]) is better. We don't really want people to do something like Window(100) do we? that will have no items in the queue, and I imagine break the laws.

Copy link
Contributor Author

@cdg-stripe cdg-stripe Jan 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at items, it fills in the queue if none was provided, i.e meant for initialization. The intent is to have a simple interface where you can wrap a window around a single object and sum in a sensible way:

implicit def wm[T] = WindowMonoid[T](2)

Window(1) + Window(2) + Window(3) == Window(5)

Needing to specify the queue would make this redundant

Window(1, Queue(1)) + ...

Tbh I'm not a big fan of even having to expose the queue, since you could change the implementation in the future (e.g. a PriorityQueue). A more general approach might use a PushPopper interface, but that's probably overkill.

if (b.items.size >= windowSize) {
var total: T = b.total
var q = b.items
while (q.size > windowSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if its possible to do it in this implementation but it would be nice to generalize how elements are evicted from the window. Right now it seems like I can only do fixed windows but I would really like to do time based rolling windows as well. For example: Rolling window of 1 hour and we evict element based on time instead of size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was to eventually have two classes/monoids, based on whether we want aggregations over fixed items or fixed time.

// Monoid defines aggregation for fixed window length, e.g. last 28 elements. Queue fixed to 28.
case class Window[T](total:T, q: Queue[T])

// Monoid defines aggregation for fixed time, e.g. last 28 days. Queue can grow arbitrarily.
case class WindowWithTs(lastestTime: Double, total: T, q: Queue[(Double, T)])

Does that seem sensible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable but I am still not sure why we cant use ProrityQueue instead of Queue? As a user I would like to control the ordering and if I am passing you a PQ, I can do that by specifying my own comparator

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in addition to the monoidLaws, I think we want something like:

forAll { (ts0: List[Int], n: Int) =>
  val ts = ts0.takeRight(n)
  val mon = new WindowMonoid(n)
  assert(mon.sum(ts.map(Window(_))).total == ts.sum)
}

That's the behavior you want, right?

* @param q (optional) queue to override default q containing input "total"
*/

case class Window[T](total: T, q: Queue[T] = Queue()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think Window[T](total: T, items: Queue[T]) is better. We don't really want people to do something like Window(100) do we? that will have no items in the queue, and I imagine break the laws.

@cdg-stripe
Copy link
Contributor Author

cdg-stripe commented Jan 26, 2018

@MansurAshraf: I'm trying to grasp whether we need a fully general implementation up front, where the user inputs their own queue/priority. As mentioned above my thoughts are to have trailing windows based on one of:

  • Fixed number of elements
  • Fixed time interval (not implemented here, but can in a separate PR)

The "last N ip-addresses" problem is interesting. But rather than making this implementation more flexible/abstract, it might make sense to have a LastNDistinct monoid. I suspect this approach will be easier to test and implement.

}

/*
Example usage:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make this a scaladoc comment on the case class?

val ts = ts0.takeRight(n)
val mon = new WindowMonoid(n)
assert(mon.sum(ts0.map( Window(_) )).total == ts.sum)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a second one:

forAll { (ts0: List[Int], ts1: List[Int], n: Int) =>
  val expected = Queue((ts0 ::: ts1).takeRight(n): _*)
  val mon = new WindowMonoid(n)
  val got = mon.plus(Window(ts0.sum, Queue(ts0: _*)), Window(ts1.sum, Queue(ts1:_*)))
  assert(got == expected)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Provides a natural monoid for combining windows truncated to some window size.
*
* @param windowSize
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add more comment to this javaDoc param?

case Priority.Preferred(g) => Window(g.zero)
case Priority.Fallback(m) => Window(m.zero)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about a

  def fromIterable[T](ts: Iterable[T]): Window[T] = {
    val monT: Monoid[T] = p.join
    val right = ts.toList.takeRight(windowSize)
    val total = monT.sum(right)
    Window(total, Queue.empty[T] ++ right)
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnynek: I did this, and generalized to Traversable.

case Priority.Preferred(g) => plusG(a, b)(g)
case Priority.Fallback(m) => plusM(a, b)(m)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can optimize sumOption somewhat here: you can take blocks of data and ignore the left if the right is full. PS: scalding uses this if it exists, so overridding sumOption on Semigroup (or any subclass) can be a very nice performance win on map/reduce.

def sumOption(ws: TraversableOnce[Window[T]]): Option[Window[T]] =
  if (ws.isEmpty) None
  else {
    val it = ws.toIterator
    var size = 0
    var queue = Queue.empty[Window[T]]
    while (it.hasNext) {
      val n = it.next
      queue = queue :+ n
      size += n.size
      val tailSize = size - queue.head.size
      if (tailSize >= windowSize) {
        queue = queue.tail
        size = tailSize
      }
    }
    // now we only have to merge the queue:
    queue.tail.foldLeft(queue.head)(plus(_, _))
  }

I think this can be a pretty big win.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnynek: Overwrote sumOption below.

@johnynek
Copy link
Collaborator

this PR: #650 seems to fix the ruby issue on travis.

@johnynek
Copy link
Collaborator

(nothing like non-hermetic builds to make you feel real good, all day long).

if(ws.isEmpty) None
else {
val it = ws.toIterator
var queue = Queue[T]()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.empty is more efficient since you avoid allocating the varargs wrapper.

while (it.hasNext) {
queue = (queue ++ it.next.items).takeRight(windowSize)
}
Some(fromTraversable(queue))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using fromTraversable here is ignoring that you already have a queue. I think summing the queue and reusing it will be a significant performance improvement


object Window {
def apply[T](v: T): Window[T] = Window[T](v, Queue[T](v))
def from[T](ts: Traversable[T])(implicit m: WindowMonoid[T]) = m.fromTraversable(ts)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Iterable? Scala 2.13 is changing the collections to avoid Traversable: https://www.scala-lang.org/blog/2017/02/28/collections-rework.html#traversable-and-iterable

@codecov-io
Copy link

Codecov Report

Merging #649 into develop will decrease coverage by 0.02%.
The diff coverage is 73.91%.

Impacted file tree graph

@@             Coverage Diff             @@
##           develop     #649      +/-   ##
===========================================
- Coverage    82.83%   82.81%   -0.03%     
===========================================
  Files          108      109       +1     
  Lines         5163     5209      +46     
  Branches       314      316       +2     
===========================================
+ Hits          4277     4314      +37     
- Misses         886      895       +9
Impacted Files Coverage Δ
...e/src/main/scala/com/twitter/algebird/Window.scala 73.91% <73.91%> (ø)
.../main/scala/com/twitter/algebird/Successible.scala 87.5% <0%> (-8.34%) ⬇️
.../main/scala/com/twitter/algebird/BloomFilter.scala 94.84% <0%> (-0.43%) ⬇️
...c/main/scala/com/twitter/algebird/MapAlgebra.scala 77.88% <0%> (+0.96%) ⬆️
...src/main/scala/com/twitter/algebird/Interval.scala 79.13% <0%> (+1.73%) ⬆️
...src/main/scala/com/twitter/algebird/Priority.scala 25% <0%> (+25%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2c79ed9...5321cfa. Read the comment docs.

@cdg-stripe
Copy link
Contributor Author

@johnynek:

Added more tests. Caught that the zero element should have an empty queue, where originally it was Queue[T](T.zero). Also made the monoid on group elements more efficient. I dug around to see if there's anything else worth tweaking, but I think it's in a decent spot.

Also what are you thoughts about Scala 2.13, when we can have a signature Window[Size, T]? Might be okay have Window[T] and WindowN[Size, T] case classes.

@johnynek
Copy link
Collaborator

johnynek commented Feb 4, 2018

yeah, I think in scala 2.13 we will have to revisit a lot of stuff, but the compatibility story is open since we will need to support 2.12 for a while, I guess (we aren't even on 2.12 yet at Stripe).

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two comments, but let's address them in a follow up PR.

Thanks for this!


require(windowSize >= 1, "Windows must have positive sizes")

def zero = p.fold(g => Window[T](g.zero, Queue.empty[T]))(m => Window(m.zero, Queue.empty[T]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this a val so we don't have to do the computation and allocation again on each call?

def zero = p.fold(g => Window[T](g.zero, Queue.empty[T]))(m => Window(m.zero, Queue.empty[T]))

def plus(a: Window[T], b: Window[T]): Window[T] =
p.fold(g => plusG(a, b)(g))(m => plusM(a, b)(m))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for performance (not calling fold on each plus call) we are probably better to have two subclasses of WindowMonoid which could be an abstract class. We can decide once when we allocate which path to choose. Then the jit should be able to inline both instances. Sadly, the jvm can't optimize the above very well.

@johnynek johnynek merged commit 03ca640 into twitter:develop Feb 4, 2018
@johnynek
Copy link
Collaborator

@johnynek
Copy link
Collaborator

@MansurAshraf note if you want the most recent K things, TopKMonoid should work:

https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/TopKMonoid.scala

(so would priorityqueue I guess).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants