From 0d0103e6c4a48687ea3cce744a6c0b3800809499 Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Thu, 25 Jan 2018 17:02:02 -0800 Subject: [PATCH 01/14] Add trailing window aggregator --- .../scala/com/twitter/algebird/Window.scala | 120 ++++++++++++++++++ .../com/twitter/algebird/WindowLawsTest.scala | 20 +++ 2 files changed, 140 insertions(+) create mode 100644 algebird-core/src/main/scala/com/twitter/algebird/Window.scala create mode 100644 algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala new file mode 100644 index 000000000..a9d833bf0 --- /dev/null +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -0,0 +1,120 @@ +/* +Copyright 2018 Stripe + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.algebird + +import scala.collection.immutable.Queue + +/** + * + * Convenience case class defined with a monoid for aggregating elements over + * a finite window. + * + * @param total Known running total of `T` + * @param items queue of known trailing elements. + */ + +case class Window[T](total: T, items: Queue[T]) { + def size = this.items.size +} + +object Window { + def apply[T](v: T): Window[T] = Window[T](v, Queue[T](v)) +} + +/** + * Provides a natural monoid for combining windows truncated to some window size. + * + * @param windowSize + */ + + +case class WindowMonoid[T]( + windowSize: Int +)(implicit p: Priority[Group[T], Monoid[T]]) + extends Monoid[Window[T]] { + val zero = + p match { + case Priority.Preferred(g) => Window(g.zero) + case Priority.Fallback(m) => Window(m.zero) + } + + def plus(a: Window[T], b: Window[T]): Window[T] = + p match { + case Priority.Preferred(g) => plusG(a, b)(g) + case Priority.Fallback(m) => plusM(a, b)(m) + } + + def plusG(a: Window[T], b: Window[T])(implicit g: Group[T]): Window[T] = + if (b.items.size >= windowSize) { + var total: T = b.total + var q = b.items + while (q.size > windowSize) { + total = total - q.head + q = q.tail + } + Window(total, q) + } else { + // we need windowSize - b.items.size from `a` + val fromA = a.items.takeRight(windowSize - b.items.size) + val res = fromA ++ b.items + val total = g.sum(fromA) + b.total + Window(total, res) + } + + def plusM(a: Window[T], b: Window[T])(implicit m: Monoid[T]): Window[T] = + if (b.items.size >= windowSize) { + var q = b.items + while (q.size > windowSize) { + q = q.tail + } + val total = m.sum(q) + Window(total, q) + } else { + // we need windowSize - b.items.size from `a` + val fromA = a.items.takeRight(windowSize - b.items.size) + val res = fromA ++ b.items + val total = m.sum(fromA) + b.total + Window(total, res) + } +} + +/* + Example usage: + + case class W90[T](window: Window[T]) { + def total = this.window.total + } + + object W90 { + def apply[T](v: T): W90[T] = W90[T](new Window(v)) + + implicit def w90Monoid[T: Monoid]: Monoid[W90[T]] = new Monoid[W90[T]] { + private val WT: Monoid[Window[T]] = WindowMonoid[T](90) + def zero = W90[T](WT.zero) + def plus(a: W90[T], b: W90[T]): W90[T] = + W90[T](WT.plus(a.window, b.window)) + } + } + + val elements = getElements() + + val trailing90Totals = + elements + .map{ W90 ( _ ) } + .foldLeft(W90(0)) { (a, b) => a + b } + .map{ _.total } + */ diff --git a/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala new file mode 100644 index 000000000..2c80f4a72 --- /dev/null +++ b/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala @@ -0,0 +1,20 @@ +package com.twitter.algebird + +import com.twitter.algebird.BaseProperties._ +import com.twitter.algebird.scalacheck.arbitrary._ +import com.twitter.algebird.scalacheck.NonEmptyVector +import org.scalacheck.Arbitrary +import org.scalacheck.Prop.forAll + + +class WindowLaws extends CheckProperties { + property("We aggregate over only n items") { + forAll { (ts0: List[Int], n: Int) => + val ts = ts0.takeRight(n) + val mon = new WindowMonoid(n) + assert(mon.sum(ts0.map( Window(_) )).total == ts.sum) + } + } + + property("Window[Double] is a monoid") { monoidLaws[Window[Double]] } +} From a3b85cc10b18b40c0ee2fb808bf1ad1b7c1703b0 Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Wed, 31 Jan 2018 21:32:26 -0800 Subject: [PATCH 02/14] Create Window from traversable --- .../src/main/scala/com/twitter/algebird/Window.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index a9d833bf0..3de962400 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -33,6 +33,7 @@ case class Window[T](total: T, items: Queue[T]) { object Window { def apply[T](v: T): Window[T] = Window[T](v, Queue[T](v)) + def from[T](ts: Traversable[T])(implicit m: WindowMonoid[T]) = m.fromTraversable(ts) } /** @@ -90,6 +91,13 @@ case class WindowMonoid[T]( val total = m.sum(fromA) + b.total Window(total, res) } + + def fromTraversable(ts: Traversable[T]): Window[T] = { + val monT: Monoid[T] = p.join + val right = ts.toList.takeRight(windowSize) + val total = monT.sum(right) + Window(total, Queue(right: _*)) + } } /* From 622fddfcd76153dff0bb2f30bbed5702365cc4c3 Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Wed, 31 Jan 2018 21:32:38 -0800 Subject: [PATCH 03/14] Override sumOption --- .../src/main/scala/com/twitter/algebird/Window.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index 3de962400..332a9611a 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -98,6 +98,18 @@ case class WindowMonoid[T]( val total = monT.sum(right) Window(total, Queue(right: _*)) } + + override def sumOption(ws: TraversableOnce[Window[T]]): Option[Window[T]] = { + if(ws.isEmpty) None + else { + val it = ws.toIterator + var queue = Queue[T]() + while (it.hasNext) { + queue = (queue ++ it.next.items).takeRight(windowSize) + } + Some(fromTraversable(queue)) + } + } } /* From afd2f07792c364e516817f72cba8ec80fed23a1f Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Wed, 31 Jan 2018 21:52:01 -0800 Subject: [PATCH 04/14] Extend tests --- .../com/twitter/algebird/WindowLawsTest.scala | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala index 2c80f4a72..6bd9b4ec4 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala @@ -12,7 +12,37 @@ class WindowLaws extends CheckProperties { forAll { (ts0: List[Int], n: Int) => val ts = ts0.takeRight(n) val mon = new WindowMonoid(n) - assert(mon.sum(ts0.map( Window(_) )).total == ts.sum) + assert(mon.sum(ts0.map(Window(_))).total == ts.sum) + } + } + + property("We correctly create a window from traversable") { + forAll { (ts0: List[Int], n: Int) => + val mon = new WindowMonoid(n) + val right = Queue(ts0.takeRight(n): _*) + val expected = Window(right.sum, right) + val got = mon.fromTraversable(ts0) + assert(got == expected) + } + } + + property("We correctly combine windows") { + forAll { (left: List[Int], right: List[Int], n: Int) => + val mon = new WindowMonoid(n) + val trunc = Queue((left ::: right).takeRight(n): _*) + val expected = Window(trunc.sum, trunc) + val got = mon.sum(mon.fromTraversable(left), mon.fromTraversable(right)) + assert(expected == got) + } + } + + property("We correctly overrode sumOption") { + forAll { (ts0: List[Int], n: Int) => + val mon = new WindowMonoid(n) + val got = mon.sumOption(ts0.map { Window(_) }) + val trunc = Queue(ts0.takeRight(n): _*) + val expected = if (n == 0) None else Some(Window(trunc.sum, trunc)) + assert(expected == got) } } From 1a1b82c9a93a3f21955cb63137f2d6d96d7ef365 Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Wed, 31 Jan 2018 22:16:59 -0800 Subject: [PATCH 05/14] Improve comments --- .../scala/com/twitter/algebird/Window.scala | 57 ++++++++++--------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index 332a9611a..3cad0455d 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -25,6 +25,34 @@ import scala.collection.immutable.Queue * * @param total Known running total of `T` * @param items queue of known trailing elements. + * + * Example usage: + * + * case class W28[T](window: Window[T]) { + * def total = this.window.total + * def items = this.window.items + * def size = this.window.size + * } + * + * object W28 { + * val windowSize = 28 + * def apply[T](v: T): W28[T] = W28[T](Window(v)) + * + * implicit def w28Monoid[T](implicit p: Priority[Group[T], Monoid[T]]): Monoid[W28[T]] = + * new Monoid[W28[T]] { + * private val WT: Monoid[Window[T]] = WindowMonoid[T](windowSize) + * def zero = W28[T](WT.zero) + * def plus(a: W28[T], b: W28[T]): W28[T] = + * W28[T](WT.plus(a.window, b.window)) + * } + * } + * val elements = getElements() + * + * val trailing90Totals = + * elements + * .map{ W90 } + * .scanLeft(W90(0)) { (a, b) => a + b } + * .map{ _.total } */ case class Window[T](total: T, items: Queue[T]) { @@ -39,7 +67,7 @@ object Window { /** * Provides a natural monoid for combining windows truncated to some window size. * - * @param windowSize + * @param windowSize Upper limit of the number of items in a window. */ @@ -111,30 +139,3 @@ case class WindowMonoid[T]( } } } - -/* - Example usage: - - case class W90[T](window: Window[T]) { - def total = this.window.total - } - - object W90 { - def apply[T](v: T): W90[T] = W90[T](new Window(v)) - - implicit def w90Monoid[T: Monoid]: Monoid[W90[T]] = new Monoid[W90[T]] { - private val WT: Monoid[Window[T]] = WindowMonoid[T](90) - def zero = W90[T](WT.zero) - def plus(a: W90[T], b: W90[T]): W90[T] = - W90[T](WT.plus(a.window, b.window)) - } - } - - val elements = getElements() - - val trailing90Totals = - elements - .map{ W90 ( _ ) } - .foldLeft(W90(0)) { (a, b) => a + b } - .map{ _.total } - */ From 387d875986b11b50ec6776fe96579f2f7dcf74a4 Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Wed, 31 Jan 2018 22:37:02 -0800 Subject: [PATCH 06/14] Make addition on groups more efficient --- .../main/scala/com/twitter/algebird/Window.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index 3cad0455d..9b7886c2f 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -98,10 +98,16 @@ case class WindowMonoid[T]( Window(total, q) } else { // we need windowSize - b.items.size from `a` - val fromA = a.items.takeRight(windowSize - b.items.size) - val res = fromA ++ b.items - val total = g.sum(fromA) + b.total - Window(total, res) + var truncA = a.items + var totalA = a.total + val truncTo = windowSize - b.size + while (truncA.size > truncTo) { + totalA = totalA - truncA.head + truncA = truncA.tail + } + val items = truncA ++ b.items + val total = totalA + b.total + Window(total, items) } def plusM(a: Window[T], b: Window[T])(implicit m: Monoid[T]): Window[T] = From 7ea0e796feb9a69ee40cd4321f1793c2ce620534 Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Wed, 31 Jan 2018 22:41:11 -0800 Subject: [PATCH 07/14] Cleanup plusM --- .../main/scala/com/twitter/algebird/Window.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index 9b7886c2f..4fcdd927c 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -112,18 +112,15 @@ case class WindowMonoid[T]( def plusM(a: Window[T], b: Window[T])(implicit m: Monoid[T]): Window[T] = if (b.items.size >= windowSize) { - var q = b.items - while (q.size > windowSize) { - q = q.tail - } - val total = m.sum(q) - Window(total, q) + val items = b.items.takeRight(windowSize) + val total = m.sum(items) + Window(total, items) } else { // we need windowSize - b.items.size from `a` val fromA = a.items.takeRight(windowSize - b.items.size) - val res = fromA ++ b.items + val items = fromA ++ b.items val total = m.sum(fromA) + b.total - Window(total, res) + Window(total, items) } def fromTraversable(ts: Traversable[T]): Window[T] = { From 096ba21e3867faed6535ea3e5dbeb137523a9cb7 Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Wed, 31 Jan 2018 22:41:29 -0800 Subject: [PATCH 08/14] Line indentation --- .../scala/com/twitter/algebird/Window.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index 4fcdd927c..9234f65d5 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -123,22 +123,22 @@ case class WindowMonoid[T]( Window(total, items) } - def fromTraversable(ts: Traversable[T]): Window[T] = { - val monT: Monoid[T] = p.join - val right = ts.toList.takeRight(windowSize) - val total = monT.sum(right) - Window(total, Queue(right: _*)) - } - - override def sumOption(ws: TraversableOnce[Window[T]]): Option[Window[T]] = { - if(ws.isEmpty) None - else { - val it = ws.toIterator - var queue = Queue[T]() - while (it.hasNext) { - queue = (queue ++ it.next.items).takeRight(windowSize) - } - Some(fromTraversable(queue)) + def fromTraversable(ts: Traversable[T]): Window[T] = { + val monT: Monoid[T] = p.join + val right = ts.toList.takeRight(windowSize) + val total = monT.sum(right) + Window(total, Queue(right: _*)) + } + + override def sumOption(ws: TraversableOnce[Window[T]]): Option[Window[T]] = { + if(ws.isEmpty) None + else { + val it = ws.toIterator + var queue = Queue[T]() + while (it.hasNext) { + queue = (queue ++ it.next.items).takeRight(windowSize) } + Some(fromTraversable(queue)) } + } } From 9203860d19420f3902f800c35d2394f1a98dcde8 Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Wed, 31 Jan 2018 22:58:26 -0800 Subject: [PATCH 09/14] Import operators --- algebird-core/src/main/scala/com/twitter/algebird/Window.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index 9234f65d5..d3bf8b151 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -17,6 +17,7 @@ limitations under the License. package com.twitter.algebird import scala.collection.immutable.Queue +import Operators._ /** * From 6d5a94817901012300a15d4deaffd8bf216a16fd Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Thu, 1 Feb 2018 09:54:27 -0800 Subject: [PATCH 10/14] PR comments --- .../scala/com/twitter/algebird/Window.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index d3bf8b151..cc920f182 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -62,7 +62,7 @@ case class Window[T](total: T, items: Queue[T]) { object Window { def apply[T](v: T): Window[T] = Window[T](v, Queue[T](v)) - def from[T](ts: Traversable[T])(implicit m: WindowMonoid[T]) = m.fromTraversable(ts) + def from[T](ts: Iterable[T])(implicit m: WindowMonoid[T]) = m.fromIterable(ts) } /** @@ -124,22 +124,22 @@ case class WindowMonoid[T]( Window(total, items) } - def fromTraversable(ts: Traversable[T]): Window[T] = { - val monT: Monoid[T] = p.join - val right = ts.toList.takeRight(windowSize) - val total = monT.sum(right) - Window(total, Queue(right: _*)) - } - - override def sumOption(ws: TraversableOnce[Window[T]]): Option[Window[T]] = { - if(ws.isEmpty) None + override def sumOption(ws: TraversableOnce[Window[T]]): Option[Window[T]] = + if (ws.isEmpty) None else { val it = ws.toIterator - var queue = Queue[T]() + var queue = Queue.empty[T] while (it.hasNext) { queue = (queue ++ it.next.items).takeRight(windowSize) } - Some(fromTraversable(queue)) + val monT: Monoid[T] = p.join + Some(Window(monT.sum(queue), queue)) } + + def fromIterable(ts: Iterable[T]): Window[T] = { + val monT: Monoid[T] = p.join + val right = ts.toList.takeRight(windowSize) + val total = monT.sum(right) + Window(total, Queue(right: _*)) } } From d69f14150006af67eb712fecec990d4a219252f1 Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Thu, 1 Feb 2018 10:10:38 -0800 Subject: [PATCH 11/14] Fold instead of match --- .../src/main/scala/com/twitter/algebird/Window.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index cc920f182..9ac98a36e 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -76,17 +76,11 @@ case class WindowMonoid[T]( windowSize: Int )(implicit p: Priority[Group[T], Monoid[T]]) extends Monoid[Window[T]] { - val zero = - p match { - case Priority.Preferred(g) => Window(g.zero) - case Priority.Fallback(m) => Window(m.zero) - } + + def zero = p.fold(g => Window(g.zero))(m => Window(m.zero)) def plus(a: Window[T], b: Window[T]): Window[T] = - p match { - case Priority.Preferred(g) => plusG(a, b)(g) - case Priority.Fallback(m) => plusM(a, b)(m) - } + p.fold(g => plusG(a, b)(g))(m => plusM(a, b)(m)) def plusG(a: Window[T], b: Window[T])(implicit g: Group[T]): Window[T] = if (b.items.size >= windowSize) { From 1277bd68b7ea1b563930f637f86b14d3444f00cc Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Thu, 1 Feb 2018 10:11:37 -0800 Subject: [PATCH 12/14] Fix tests --- .../com/twitter/algebird/WindowLawsTest.scala | 56 ++++++++++++------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala index 6bd9b4ec4..037115574 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/WindowLawsTest.scala @@ -1,50 +1,66 @@ package com.twitter.algebird +import scala.collection.immutable.Queue + import com.twitter.algebird.BaseProperties._ import com.twitter.algebird.scalacheck.arbitrary._ -import com.twitter.algebird.scalacheck.NonEmptyVector +import com.twitter.algebird.scalacheck.PosNum import org.scalacheck.Arbitrary +import org.scalacheck.Gen import org.scalacheck.Prop.forAll - class WindowLaws extends CheckProperties { + + implicit val mon = WindowMonoid[Int](5) + implicit val wGen = Arbitrary { + for ( + v <- Gen.choose(-1000, 1000) + ) yield (Window[Int](v)) + } + + property("Window obeys monoid laws") { monoidLaws[Window[Int]] } +} + +class WindowTest extends CheckProperties { property("We aggregate over only n items") { - forAll { (ts0: List[Int], n: Int) => + forAll { (ts0: List[Int], pn: PosNum[Int]) => + val n = pn.value val ts = ts0.takeRight(n) - val mon = new WindowMonoid(n) - assert(mon.sum(ts0.map(Window(_))).total == ts.sum) + val mon = WindowMonoid[Int](n) + mon.sum(ts0.map(Window(_))).total == ts.sum } } - property("We correctly create a window from traversable") { - forAll { (ts0: List[Int], n: Int) => - val mon = new WindowMonoid(n) + property("We correctly create a window from iterable") { + forAll { (ts0: List[Int], pn: PosNum[Int]) => + val n = pn.value + val mon = WindowMonoid[Int](n) val right = Queue(ts0.takeRight(n): _*) val expected = Window(right.sum, right) - val got = mon.fromTraversable(ts0) - assert(got == expected) + val got = mon.fromIterable(ts0) + got == expected } } property("We correctly combine windows") { - forAll { (left: List[Int], right: List[Int], n: Int) => - val mon = new WindowMonoid(n) + forAll { (left: List[Int], right: List[Int], pn: PosNum[Int]) => + val n = pn.value + val mon = WindowMonoid[Int](n) val trunc = Queue((left ::: right).takeRight(n): _*) val expected = Window(trunc.sum, trunc) - val got = mon.sum(mon.fromTraversable(left), mon.fromTraversable(right)) - assert(expected == got) + val got = mon.plus(mon.fromIterable(left), mon.fromIterable(right)) + expected == got } } property("We correctly overrode sumOption") { - forAll { (ts0: List[Int], n: Int) => - val mon = new WindowMonoid(n) + forAll { (ts0: List[Int], pn: PosNum[Int]) => + val n = pn.value + val mon = WindowMonoid[Int](n) val got = mon.sumOption(ts0.map { Window(_) }) val trunc = Queue(ts0.takeRight(n): _*) - val expected = if (n == 0) None else Some(Window(trunc.sum, trunc)) - assert(expected == got) + val expected = if (ts0.size == 0) None else Some(Window(trunc.sum, trunc)) + expected == got } } - - property("Window[Double] is a monoid") { monoidLaws[Window[Double]] } } From 16aab4c2a52a8686bbacde6ec2d2212d6a0e5c19 Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Thu, 1 Feb 2018 14:12:52 -0800 Subject: [PATCH 13/14] Require positive sizes --- algebird-core/src/main/scala/com/twitter/algebird/Window.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index 9ac98a36e..f138f4c3e 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -77,6 +77,8 @@ case class WindowMonoid[T]( )(implicit p: Priority[Group[T], Monoid[T]]) extends Monoid[Window[T]] { + require(windowSize >= 1, "Windows must have positive sizes") + def zero = p.fold(g => Window(g.zero))(m => Window(m.zero)) def plus(a: Window[T], b: Window[T]): Window[T] = From 5321cfa178b6ae640cbe97dcaf69b68b251885cf Mon Sep 17 00:00:00 2001 From: Christian Griset Date: Thu, 1 Feb 2018 14:17:49 -0800 Subject: [PATCH 14/14] zero has a queue with no elements --- .../main/scala/com/twitter/algebird/Window.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala index f138f4c3e..22d148eec 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Window.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Window.scala @@ -79,7 +79,7 @@ case class WindowMonoid[T]( require(windowSize >= 1, "Windows must have positive sizes") - def zero = p.fold(g => Window(g.zero))(m => Window(m.zero)) + def zero = p.fold(g => Window[T](g.zero, Queue.empty[T]))(m => Window(m.zero, Queue.empty[T])) def plus(a: Window[T], b: Window[T]): Window[T] = p.fold(g => plusG(a, b)(g))(m => plusM(a, b)(m)) @@ -133,9 +133,12 @@ case class WindowMonoid[T]( } def fromIterable(ts: Iterable[T]): Window[T] = { - val monT: Monoid[T] = p.join - val right = ts.toList.takeRight(windowSize) - val total = monT.sum(right) - Window(total, Queue(right: _*)) + if(ts.size == 0) zero + else { + val monT: Monoid[T] = p.join + val right = ts.toList.takeRight(windowSize) + val total = monT.sum(right) + Window(total, Queue(right: _*)) + } } }