-
Notifications
You must be signed in to change notification settings - Fork 347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement an appendMonoid Aggregator factory which yields aggregators… #501
Changes from 4 commits
eaaa054
fd9b866
c9f2e06
ba9df9f
f0dc4f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,11 @@ object Aggregator extends java.io.Serializable { | |
def fromMonoid[F, T](implicit mon: Monoid[T], prep: F => T): MonoidAggregator[F, T, T] = | ||
prepareMonoid(prep)(mon) | ||
|
||
def prepareSemigroup[F, T](prep: F => T)(implicit sg: Semigroup[T]): Aggregator[F, T, T] = new Aggregator[F, T, T] { | ||
def prepare(input: F) = prep(input) | ||
def semigroup = sg | ||
def present(reduction: T) = reduction | ||
} | ||
def prepareMonoid[F, T](prep: F => T)(implicit m: Monoid[T]): MonoidAggregator[F, T, T] = new MonoidAggregator[F, T, T] { | ||
def prepare(input: F) = prep(input) | ||
def monoid = m | ||
|
@@ -47,6 +52,86 @@ object Aggregator extends java.io.Serializable { | |
def present(reduction: T) = reduction | ||
} | ||
|
||
/** | ||
* Obtain an [[Aggregator]] that uses an efficient append operation for faster aggregation. | ||
* Equivalent to {{{ appendSemigroup(prep, appnd, identity[T]_)(sg) }}} | ||
*/ | ||
def appendSemigroup[F, T](prep: F => T, appnd: (T, F) => T)(implicit sg: Semigroup[T]): Aggregator[F, T, T] = | ||
appendSemigroup(prep, appnd, identity[T]_)(sg) | ||
|
||
/** | ||
* Obtain an [[Aggregator]] that uses an efficient append operation for faster aggregation | ||
* @tparam F Data input type | ||
* @tparam T Aggregating [[Semigroup]] type | ||
* @tparam P Presentation (output) type | ||
* @param prep The preparation function. Expected to construct an instance of type T from a single data element. | ||
* @param appnd Function that appends the [[Semigroup]]. Defines the [[append]] method for this aggregator. | ||
* Analogous to the 'seqop' function in Scala's sequence 'aggregate' method | ||
* @param pres The presentation function | ||
* @param sg The [[Semigroup]] type class | ||
* @note The functions 'appnd' and 'prep' are expected to obey the law: {{{ appnd(t, f) == sg.plus(t, prep(f)) }}} | ||
*/ | ||
def appendSemigroup[F, T, P](prep: F => T, appnd: (T, F) => T, pres: T => P)(implicit sg: Semigroup[T]): Aggregator[F, T, P] = | ||
new Aggregator[F, T, P] { | ||
def semigroup: Semigroup[T] = sg | ||
def prepare(input: F): T = prep(input) | ||
def present(reduction: T): P = pres(reduction) | ||
|
||
override def apply(inputs: TraversableOnce[F]): P = applyOption(inputs).get | ||
|
||
override def applyOption(inputs: TraversableOnce[F]): Option[P] = agg(inputs).map(pres) | ||
|
||
override def append(l: T, r: F): T = appnd(l, r) | ||
|
||
override def appendAll(old: T, items: TraversableOnce[F]): T = | ||
if (items.isEmpty) old else reduce(old, agg(items).get) | ||
|
||
private def agg(inputs: TraversableOnce[F]): Option[T] = | ||
if (inputs.isEmpty) None else { | ||
val itr = inputs.toIterator | ||
val t = prepare(itr.next) | ||
Some(itr.foldLeft(t)(appnd)) | ||
} | ||
} | ||
|
||
/** | ||
* Obtain a [[MonoidAggregator]] that uses an efficient append operation for faster aggregation. | ||
* Equivalent to {{{ appendMonoid(appnd, identity[T]_)(m) }}} | ||
*/ | ||
def appendMonoid[F, T](appnd: (T, F) => T)(implicit m: Monoid[T]): MonoidAggregator[F, T, T] = | ||
appendMonoid(appnd, identity[T]_)(m) | ||
|
||
/** | ||
* Obtain a [[MonoidAggregator]] that uses an efficient append operation for faster aggregation | ||
* @tparam F Data input type | ||
* @tparam T Aggregating [[Monoid]] type | ||
* @tparam P Presentation (output) type | ||
* @param appnd Function that appends the [[Monoid]]. Defines the [[append]] method for this aggregator. | ||
* Analogous to the 'seqop' function in Scala's sequence 'aggregate' method | ||
* @param pres The presentation function | ||
* @param m The [[Monoid]] type class | ||
* @note The function 'appnd' is expected to obey the law: {{{ appnd(t, f) == m.plus(t, appnd(m.zero, f)) }}} | ||
*/ | ||
def appendMonoid[F, T, P](appnd: (T, F) => T, pres: T => P)(implicit m: Monoid[T]): MonoidAggregator[F, T, P] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so, appdn(appnd(Monoid.zero, f1), f2) == Monoid.plus(appnd(Monoid.zero, f1), appnd(Monoid.zero, f2)) Otherwise This is addressing one performance issue, which might be a very important one, but also the case of dealing with a bulk set of items (Monoid.sum/Semigroup.sumOption) is also an important optimization. Since we expose the monoid/semigroup of an Aggregator, scalding and spark use that to get the sumOption speedup. For those cases, they will not get the benefit of an optimized There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may be missing some angle here, but that seems like an opportunity for Scalding/Spark to take better advantage of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @erikerlandson I think the question is whether the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To put it another way, if you ultimately think of this like a |
||
new MonoidAggregator[F, T, P] { | ||
def monoid: Monoid[T] = m | ||
def prepare(input: F): T = appnd(m.zero, input) | ||
def present(reduction: T): P = pres(reduction) | ||
|
||
override def apply(inputs: TraversableOnce[F]): P = present(agg(inputs)) | ||
|
||
override def applyOption(inputs: TraversableOnce[F]): Option[P] = | ||
if (inputs.isEmpty) None else Some(apply(inputs)) | ||
|
||
override def append(l: T, r: F): T = appnd(l, r) | ||
|
||
override def appendAll(old: T, items: TraversableOnce[F]): T = reduce(old, agg(items)) | ||
|
||
override def appendAll(items: TraversableOnce[F]): T = agg(items) | ||
|
||
private def agg(inputs: TraversableOnce[F]): T = inputs.aggregate(m.zero)(appnd, m.plus) | ||
} | ||
|
||
/** | ||
* How many items satisfy a predicate | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,17 @@ class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal { | |
* requires a commutative Semigroup. To generalize to non-commutative, we need a sorted partition for | ||
* T. | ||
*/ | ||
def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] = | ||
(new AlgebirdRDD(rdd.map(agg.prepare))) | ||
.sumOption(agg.semigroup, implicitly) | ||
.map(agg.present) | ||
def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] = { | ||
val pr = rdd.mapPartitions({ data => | ||
if (data.isEmpty) Iterator.empty else { | ||
val sg = agg.prepare(data.next) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we rename There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, I habitually confuse types with their type classes |
||
Iterator(agg.appendAll(sg, data)) | ||
} | ||
}, preservesPartitioning = true) | ||
pr.coalesce(1, shuffle = true) | ||
.mapPartitions(pr => Iterator(agg.semigroup.sumOption(pr))) | ||
.collect.head.map(agg.present) | ||
} | ||
|
||
/** | ||
* This will throw if you use a non-MonoidAggregator with an empty RDD | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package com.twitter.algebird | ||
|
||
import org.scalatest._ | ||
|
||
class AppendAggregatorTest extends WordSpec with Matchers { | ||
val data = Vector.fill(100) { scala.util.Random.nextInt(100) } | ||
val mpty = Vector.empty[Int] | ||
|
||
// test the methods that appendSemigroup method defines or overrides | ||
def testMethodsSemigroup[E, M, P]( | ||
agg1: Aggregator[E, M, P], | ||
agg2: Aggregator[E, M, P], | ||
data: Seq[E], | ||
empty: Seq[E]) { | ||
|
||
val n = data.length | ||
val (half1, half2) = data.splitAt(n / 2) | ||
val lhs = agg1.appendAll(agg1.prepare(half1.head), half1.tail) | ||
|
||
data.foreach { e => | ||
agg1.prepare(e) should be(agg2.prepare(e)) | ||
} | ||
|
||
agg1.present(lhs) should be(agg2.present(lhs)) | ||
|
||
agg1(data) should be (agg2(data)) | ||
|
||
agg1.applyOption(data) should be(agg2.applyOption(data)) | ||
agg1.applyOption(empty) should be(agg2.applyOption(empty)) | ||
|
||
half2.foreach { e => | ||
agg1.append(lhs, e) should be(agg2.append(lhs, e)) | ||
} | ||
|
||
agg1.appendAll(lhs, half2) should be(agg2.appendAll(lhs, half2)) | ||
} | ||
|
||
// test the methods that appendMonoid method defines or overrides | ||
def testMethodsMonoid[E, M, P]( | ||
agg1: MonoidAggregator[E, M, P], | ||
agg2: MonoidAggregator[E, M, P], | ||
data: Seq[E], | ||
empty: Seq[E]) { | ||
|
||
testMethodsSemigroup(agg1, agg2, data, empty) | ||
|
||
agg1(empty) should be (agg2(empty)) | ||
agg1.appendAll(data) should be(agg2.appendAll(data)) | ||
} | ||
|
||
"appendMonoid" should { | ||
"be equivalent to integer monoid aggregator" in { | ||
val agg1 = Aggregator.fromMonoid[Int] | ||
val agg2 = Aggregator.appendMonoid((m: Int, e: Int) => m + e) | ||
testMethodsMonoid(agg1, agg2, data, mpty) | ||
} | ||
|
||
"be equivalent to set monoid aggregator" in { | ||
object setMonoid extends Monoid[Set[Int]] { | ||
val zero = Set.empty[Int] | ||
def plus(m1: Set[Int], m2: Set[Int]) = m1 ++ m2 | ||
} | ||
|
||
val agg1 = Aggregator.prepareMonoid((e: Int) => Set(e))(setMonoid) | ||
val agg2 = Aggregator.appendMonoid((m: Set[Int], e: Int) => m + e)(setMonoid) | ||
|
||
testMethodsMonoid(agg1, agg2, data, mpty) | ||
} | ||
} | ||
|
||
"appendSemigroup" should { | ||
"be equivalent to integer semigroup aggregator" in { | ||
val agg1 = Aggregator.fromSemigroup[Int] | ||
val agg2 = Aggregator.appendSemigroup(identity[Int]_, (m: Int, e: Int) => m + e) | ||
testMethodsSemigroup(agg1, agg2, data, mpty) | ||
} | ||
|
||
"be equivalent to set semigroup aggregator" in { | ||
object setSemigroup extends Semigroup[Set[Int]] { | ||
def plus(m1: Set[Int], m2: Set[Int]) = m1 ++ m2 | ||
} | ||
|
||
val agg1 = Aggregator.prepareSemigroup((e: Int) => Set(e))(setSemigroup) | ||
val agg2 = Aggregator.appendSemigroup((e: Int) => Set(e), (m: Set[Int], e: Int) => m + e)(setSemigroup) | ||
|
||
testMethodsSemigroup(agg1, agg2, data, mpty) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not crazy about this documentation leaving out that this is disabling the sumOption optimization, that would have previously been there.
Have you tried a benchmark using a
Map[K, V]
value or HyperLogLog? Those two have been optimized a fair bit with sumOption (and many others). In some cases, the savings of doing your optimization here will be greater, but it won't neccesarily be faster.In the HLL case, you might imagine that this would be a win, but there is a an optimized prepare that allocates a sparse object, and then sumOption uses a mutable Array. Here, with append, you could skip the allocation of the sparse HLL object, but you are still going to reallocate a new immutable HLL on each plus operation, and my guess is that will be significantly slower.
We have the benchmarks subproject here, which we try to use when we are submitting performance optimizations. If nothing else we need to add a comment so this does not confuse new users more (basically, when performance is a concern, they have to benchmark).