-
Notifications
You must be signed in to change notification settings - Fork 347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement an appendMonoid Aggregator factory which yields aggregators… #501
Implement an appendMonoid Aggregator factory which yields aggregators… #501
Conversation
… that can take advantage of an efficient append method for faster aggregations
* @param pres The presentation function | ||
* @param m The [[Monoid]] type class | ||
*/ | ||
def appendMonoid[F, T, P](appnd: (T, F) => T, pres: T => P)(implicit m: Monoid[T]): MonoidAggregator[F, T, P] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, appnd
must satisfy the law:
appdn(appnd(Monoid.zero, f1), f2) == Monoid.plus(appnd(Monoid.zero, f1), appnd(Monoid.zero, f2))
Otherwise prepare
doesn't mean what we expect. If you agree, we should add this law to the comment.
This is addressing one performance issue, which might be a very important one, but also the case of dealing with a bulk set of items (Monoid.sum/Semigroup.sumOption) is also an important optimization.
Since we expose the monoid/semigroup of an Aggregator, scalding and spark use that to get the sumOption speedup. For those cases, they will not get the benefit of an optimized append
and in fact will call prepare
which may be even slower. So, I think the comment "faster aggregation" is more complex and we need to point out that this could be slower than the standard approach depending on the Monoid and appnd function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be missing some angle here, but that seems like an opportunity for Scalding/Spark to take better advantage of traversableOnce.aggregate(...)
over individual partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@erikerlandson I think the question is whether the append()
optimization is more effective than whatever optimization is in sumOption
. They're both about eliminating unnecessary intermediate values of the type we have a Monoid
on, but in different ways - it's sorta a question of, for each reduce(left: T, right: T)
, whether you need the left side to actually be a T (which sumOption
eliminates) or the right side to actually be a T
(which append
eliminates). You could imagine an appendAll
which requires neither to be, but that feels even more invasive for the Aggregator
to eliminate vs. the AppendMonoid
like you proposed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To put it another way, if you ultimately think of this like a def foldLeft[B](z: B)(f: (B, A) => B): B
on a Seq[A]
, and you have some Monoid[T]
, the question is do we have B == T
(a constraint that sumOption
gets rid of) or do we have A == T
(a constraint that append
gets rid of), or both, or neither, and what's the most efficient space to do this aggregation in.
The rub seems to be that there are some possibly-incompatible cases. If you have a My desire to always have my cake and eat it too makes me wonder if there is any advantage to adding some method like |
Last night I spent some time trying to work through what I'm "really" after, and how that relates to algebraic concerns and algebird's type system. Firstly, I've been coming at this from the perspective that I want a principled algebraic way to capture the functionality of Scala's
trait ElementAggregator[A, E] {
def aggregate(data: TraversableOnce[E]): A
}
trait MonoidElementAggregator[M, E] extends ElementAggregator[M, E] {
def monoid: M
def append(m: M, e: E): M
}
trait SemigroupElementAggregator[S, E] extends ElementAggregator[S, E] {
def semigroup: S
def construct(e: E): S
} |
From my original blog post, the main cost with |
I'm not sure I understand why "it isn't avoidable". That only seems to be true if you want to treat every element of your input identically. But why is that necessary? Can't you do the equivalent of |
@avibryant good point. Using scala> val data = Vector.fill(1000000) { scala.util.Random.nextInt(100) }
data: scala.collection.immutable.Vector[Int] = Vector(73, 26, 81, 92, 90, 73, ...
scala> val zero = Set.empty[Int]
zero: scala.collection.immutable.Set[Int] = Set()
scala> val plus = (s1: Set[Int], s2: Set[Int]) => s1 ++ s2
plus: (Set[Int], Set[Int]) => scala.collection.immutable.Set[Int] = <function2>
scala> val append = (s: Set[Int], e: Int) => s + e
append: (Set[Int], Int) => scala.collection.immutable.Set[Int] = <function2>
scala> val construct = (e: Int) => Set(e)
construct: Int => scala.collection.immutable.Set[Int] = <function1>
// This is still the worst:
scala> benchmark(10) { data.map(construct).reduceLeft(plus) }
res11: Double = 0.4000747779
// Using one construction followed by foldLeft is quite a bit faster
scala> benchmark(10) { data.tail.foldLeft(construct(data.head))(append) }
res12: Double = 0.040250169200000005
// without parallelism, aggregate performs same as foldLeft above
scala> benchmark(10) { data.aggregate(zero)(append, plus) }
res13: Double = 0.038395296800000006
// with parallelism, aggregate will beat foldLeft:
scala> benchmark(10) { data.par.aggregate(zero)(append, plus) }
res14: Double = 0.011873102699999999
scala> |
Interesting. I don't have time to do this right now but it seems like you could add a |
…faster appendAll methods for Aggregator objects
Seeing that the |
This looks good. As a slightly separate thing (but that might be worth including in this PR anyway), I think we should have the default |
@avibryant I think |
Aha, you're right, I hadn't realized that. Thanks. This LGTM, then. |
def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] = { | ||
val pr = rdd.mapPartitions({ data => | ||
if (data.isEmpty) Iterator.empty else { | ||
val sg = agg.prepare(data.next) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we rename sg
to be b
here? It is of type b
, right? sg
sounds like semigroup to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I habitually confuse types with their type classes
appendSemigroup(prep, appnd, identity[T]_)(sg) | ||
|
||
/** | ||
* Obtain an [[Aggregator]] that uses an efficient append operation for faster aggregation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not crazy about this documentation leaving out that this is disabling the sumOption optimization, that would have previously been there.
Have you tried a benchmark using a Map[K, V]
value or HyperLogLog? Those two have been optimized a fair bit with sumOption (and many others). In some cases, the savings of doing your optimization here will be greater, but it won't neccesarily be faster.
In the HLL case, you might imagine that this would be a win, but there is a an optimized prepare that allocates a sparse object, and then sumOption uses a mutable Array. Here, with append, you could skip the allocation of the sparse HLL object, but you are still going to reallocate a new immutable HLL on each plus operation, and my guess is that will be significantly slower.
We have the benchmarks subproject here, which we try to use when we are submitting performance optimizations. If nothing else we need to add a comment so this does not confuse new users more (basically, when performance is a concern, they have to benchmark).
This has addressed @johnynek's comments and LGTM, I'm going to merge. Thanks @erikerlandson ! |
Implement an appendMonoid Aggregator factory which yields aggregators…
… that can take advantage of an efficient append method for faster aggregations.
This is a PR in response to:
http://erikerlandson.github.io/blog/2015/11/24/the-prepare-operation-considered-harmful-in-algebird/
The rationale for a solution based on a factory function is that the
append
method andprepare
method need to remain logically consistent with each other. The factory function enforces that logical constraint, and in addition allows a few other efficient overrides.Working through this issue has left me still feeling like an Aggregator class explicitly based on a Monoid, augmented with
append
, is desirable. The main problem is that it would not be backward compatible with the current design that requires a definition ofprepare
.Related but tangential, I'm still feeling like a
Monoid[T]
subclassAppendMonoid[T, E]
, havingzero
,plus
andappend
, is a possibly useful thing. The type laws for such an object would include all the Monoid type laws, and also probably something like:mon.append(t, e) == mon.plus(t, mon.append(mon.zero, e))