Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an appendMonoid Aggregator factory which yields aggregators… #501

Merged
merged 5 commits into from
Dec 4, 2015
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ object Aggregator extends java.io.Serializable {
def fromMonoid[F, T](implicit mon: Monoid[T], prep: F => T): MonoidAggregator[F, T, T] =
prepareMonoid(prep)(mon)

def prepareSemigroup[F, T](prep: F => T)(implicit sg: Semigroup[T]): Aggregator[F, T, T] = new Aggregator[F, T, T] {
def prepare(input: F) = prep(input)
def semigroup = sg
def present(reduction: T) = reduction
}
def prepareMonoid[F, T](prep: F => T)(implicit m: Monoid[T]): MonoidAggregator[F, T, T] = new MonoidAggregator[F, T, T] {
def prepare(input: F) = prep(input)
def monoid = m
Expand All @@ -47,6 +52,86 @@ object Aggregator extends java.io.Serializable {
def present(reduction: T) = reduction
}

/**
* Obtain an [[Aggregator]] that uses an efficient append operation for faster aggregation.
* Equivalent to {{{ appendSemigroup(prep, appnd, identity[T]_)(sg) }}}
*/
def appendSemigroup[F, T](prep: F => T, appnd: (T, F) => T)(implicit sg: Semigroup[T]): Aggregator[F, T, T] =
appendSemigroup(prep, appnd, identity[T]_)(sg)

/**
* Obtain an [[Aggregator]] that uses an efficient append operation for faster aggregation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not crazy about this documentation leaving out that this is disabling the sumOption optimization, that would have previously been there.

Have you tried a benchmark using a Map[K, V] value or HyperLogLog? Those two have been optimized a fair bit with sumOption (and many others). In some cases, the savings of doing your optimization here will be greater, but it won't neccesarily be faster.

In the HLL case, you might imagine that this would be a win, but there is a an optimized prepare that allocates a sparse object, and then sumOption uses a mutable Array. Here, with append, you could skip the allocation of the sparse HLL object, but you are still going to reallocate a new immutable HLL on each plus operation, and my guess is that will be significantly slower.

We have the benchmarks subproject here, which we try to use when we are submitting performance optimizations. If nothing else we need to add a comment so this does not confuse new users more (basically, when performance is a concern, they have to benchmark).

* @tparam F Data input type
* @tparam T Aggregating [[Semigroup]] type
* @tparam P Presentation (output) type
* @param prep The preparation function. Expected to construct an instance of type T from a single data element.
* @param appnd Function that appends the [[Semigroup]]. Defines the [[append]] method for this aggregator.
* Analogous to the 'seqop' function in Scala's sequence 'aggregate' method
* @param pres The presentation function
* @param sg The [[Semigroup]] type class
* @note The functions 'appnd' and 'prep' are expected to obey the law: {{{ appnd(t, f) == sg.plus(t, prep(f)) }}}
*/
def appendSemigroup[F, T, P](prep: F => T, appnd: (T, F) => T, pres: T => P)(implicit sg: Semigroup[T]): Aggregator[F, T, P] =
new Aggregator[F, T, P] {
def semigroup: Semigroup[T] = sg
def prepare(input: F): T = prep(input)
def present(reduction: T): P = pres(reduction)

override def apply(inputs: TraversableOnce[F]): P = applyOption(inputs).get

override def applyOption(inputs: TraversableOnce[F]): Option[P] = agg(inputs).map(pres)

override def append(l: T, r: F): T = appnd(l, r)

override def appendAll(old: T, items: TraversableOnce[F]): T =
if (items.isEmpty) old else reduce(old, agg(items).get)

private def agg(inputs: TraversableOnce[F]): Option[T] =
if (inputs.isEmpty) None else {
val itr = inputs.toIterator
val t = prepare(itr.next)
Some(itr.foldLeft(t)(appnd))
}
}

/**
* Obtain a [[MonoidAggregator]] that uses an efficient append operation for faster aggregation.
* Equivalent to {{{ appendMonoid(appnd, identity[T]_)(m) }}}
*/
def appendMonoid[F, T](appnd: (T, F) => T)(implicit m: Monoid[T]): MonoidAggregator[F, T, T] =
appendMonoid(appnd, identity[T]_)(m)

/**
* Obtain a [[MonoidAggregator]] that uses an efficient append operation for faster aggregation
* @tparam F Data input type
* @tparam T Aggregating [[Monoid]] type
* @tparam P Presentation (output) type
* @param appnd Function that appends the [[Monoid]]. Defines the [[append]] method for this aggregator.
* Analogous to the 'seqop' function in Scala's sequence 'aggregate' method
* @param pres The presentation function
* @param m The [[Monoid]] type class
* @note The function 'appnd' is expected to obey the law: {{{ appnd(t, f) == m.plus(t, appnd(m.zero, f)) }}}
*/
def appendMonoid[F, T, P](appnd: (T, F) => T, pres: T => P)(implicit m: Monoid[T]): MonoidAggregator[F, T, P] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, appnd must satisfy the law:

appdn(appnd(Monoid.zero, f1), f2) == Monoid.plus(appnd(Monoid.zero, f1), appnd(Monoid.zero, f2))

Otherwise prepare doesn't mean what we expect. If you agree, we should add this law to the comment.

This is addressing one performance issue, which might be a very important one, but also the case of dealing with a bulk set of items (Monoid.sum/Semigroup.sumOption) is also an important optimization.

Since we expose the monoid/semigroup of an Aggregator, scalding and spark use that to get the sumOption speedup. For those cases, they will not get the benefit of an optimized append and in fact will call prepare which may be even slower. So, I think the comment "faster aggregation" is more complex and we need to point out that this could be slower than the standard approach depending on the Monoid and appnd function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing some angle here, but that seems like an opportunity for Scalding/Spark to take better advantage of traversableOnce.aggregate(...) over individual partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikerlandson I think the question is whether the append() optimization is more effective than whatever optimization is in sumOption. They're both about eliminating unnecessary intermediate values of the type we have a Monoid on, but in different ways - it's sorta a question of, for each reduce(left: T, right: T), whether you need the left side to actually be a T (which sumOption eliminates) or the right side to actually be a T (which append eliminates). You could imagine an appendAll which requires neither to be, but that feels even more invasive for the Aggregator to eliminate vs. the AppendMonoid like you proposed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To put it another way, if you ultimately think of this like a def foldLeft[B](z: B)(f: (B, A) => B): B on a Seq[A], and you have some Monoid[T], the question is do we have B == T (a constraint that sumOption gets rid of) or do we have A == T (a constraint that append gets rid of), or both, or neither, and what's the most efficient space to do this aggregation in.

new MonoidAggregator[F, T, P] {
def monoid: Monoid[T] = m
def prepare(input: F): T = appnd(m.zero, input)
def present(reduction: T): P = pres(reduction)

override def apply(inputs: TraversableOnce[F]): P = present(agg(inputs))

override def applyOption(inputs: TraversableOnce[F]): Option[P] =
if (inputs.isEmpty) None else Some(apply(inputs))

override def append(l: T, r: F): T = appnd(l, r)

override def appendAll(old: T, items: TraversableOnce[F]): T = reduce(old, agg(items))

override def appendAll(items: TraversableOnce[F]): T = agg(items)

private def agg(inputs: TraversableOnce[F]): T = inputs.aggregate(m.zero)(appnd, m.plus)
}

/**
* How many items satisfy a predicate
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal {
* requires a commutative Semigroup. To generalize to non-commutative, we need a sorted partition for
* T.
*/
def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] =
(new AlgebirdRDD(rdd.map(agg.prepare)))
.sumOption(agg.semigroup, implicitly)
.map(agg.present)
def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] = {
val pr = rdd.mapPartitions({ data =>
if (data.isEmpty) Iterator.empty else {
val sg = agg.prepare(data.next)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename sg to be b here? It is of type b, right? sg sounds like semigroup to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I habitually confuse types with their type classes

Iterator(agg.appendAll(sg, data))
}
}, preservesPartitioning = true)
pr.coalesce(1, shuffle = true)
.mapPartitions(pr => Iterator(agg.semigroup.sumOption(pr)))
.collect.head.map(agg.present)
}

/**
* This will throw if you use a non-MonoidAggregator with an empty RDD
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.twitter.algebird

import org.scalatest._

class AppendAggregatorTest extends WordSpec with Matchers {
val data = Vector.fill(100) { scala.util.Random.nextInt(100) }
val mpty = Vector.empty[Int]

// test the methods that appendSemigroup method defines or overrides
def testMethodsSemigroup[E, M, P](
agg1: Aggregator[E, M, P],
agg2: Aggregator[E, M, P],
data: Seq[E],
empty: Seq[E]) {

val n = data.length
val (half1, half2) = data.splitAt(n / 2)
val lhs = agg1.appendAll(agg1.prepare(half1.head), half1.tail)

data.foreach { e =>
agg1.prepare(e) should be(agg2.prepare(e))
}

agg1.present(lhs) should be(agg2.present(lhs))

agg1(data) should be (agg2(data))

agg1.applyOption(data) should be(agg2.applyOption(data))
agg1.applyOption(empty) should be(agg2.applyOption(empty))

half2.foreach { e =>
agg1.append(lhs, e) should be(agg2.append(lhs, e))
}

agg1.appendAll(lhs, half2) should be(agg2.appendAll(lhs, half2))
}

// test the methods that appendMonoid method defines or overrides
def testMethodsMonoid[E, M, P](
agg1: MonoidAggregator[E, M, P],
agg2: MonoidAggregator[E, M, P],
data: Seq[E],
empty: Seq[E]) {

testMethodsSemigroup(agg1, agg2, data, empty)

agg1(empty) should be (agg2(empty))
agg1.appendAll(data) should be(agg2.appendAll(data))
}

"appendMonoid" should {
"be equivalent to integer monoid aggregator" in {
val agg1 = Aggregator.fromMonoid[Int]
val agg2 = Aggregator.appendMonoid((m: Int, e: Int) => m + e)
testMethodsMonoid(agg1, agg2, data, mpty)
}

"be equivalent to set monoid aggregator" in {
object setMonoid extends Monoid[Set[Int]] {
val zero = Set.empty[Int]
def plus(m1: Set[Int], m2: Set[Int]) = m1 ++ m2
}

val agg1 = Aggregator.prepareMonoid((e: Int) => Set(e))(setMonoid)
val agg2 = Aggregator.appendMonoid((m: Set[Int], e: Int) => m + e)(setMonoid)

testMethodsMonoid(agg1, agg2, data, mpty)
}
}

"appendSemigroup" should {
"be equivalent to integer semigroup aggregator" in {
val agg1 = Aggregator.fromSemigroup[Int]
val agg2 = Aggregator.appendSemigroup(identity[Int]_, (m: Int, e: Int) => m + e)
testMethodsSemigroup(agg1, agg2, data, mpty)
}

"be equivalent to set semigroup aggregator" in {
object setSemigroup extends Semigroup[Set[Int]] {
def plus(m1: Set[Int], m2: Set[Int]) = m1 ++ m2
}

val agg1 = Aggregator.prepareSemigroup((e: Int) => Set(e))(setSemigroup)
val agg2 = Aggregator.appendSemigroup((e: Int) => Set(e), (m: Set[Int], e: Int) => m + e)(setSemigroup)

testMethodsSemigroup(agg1, agg2, data, mpty)
}
}
}