Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an appendMonoid Aggregator factory which yields aggregators… #501

Merged
merged 5 commits into from
Dec 4, 2015

Conversation

erikerlandson
Copy link
Contributor

… that can take advantage of an efficient append method for faster aggregations.

This is a PR in response to:
http://erikerlandson.github.io/blog/2015/11/24/the-prepare-operation-considered-harmful-in-algebird/

The rationale for a solution based on a factory function is that the append method and prepare method need to remain logically consistent with each other. The factory function enforces that logical constraint, and in addition allows a few other efficient overrides.

Working through this issue has left me still feeling like an Aggregator class explicitly based on a Monoid, augmented with append, is desirable. The main problem is that it would not be backward compatible with the current design that requires a definition of prepare.

Related but tangential, I'm still feeling like a Monoid[T] subclass AppendMonoid[T, E], having zero, plus and append, is a possibly useful thing. The type laws for such an object would include all the Monoid type laws, and also probably something like:

mon.append(t, e) == mon.plus(t, mon.append(mon.zero, e))

… that can take advantage of an efficient append method for faster aggregations
* @param pres The presentation function
* @param m The [[Monoid]] type class
*/
def appendMonoid[F, T, P](appnd: (T, F) => T, pres: T => P)(implicit m: Monoid[T]): MonoidAggregator[F, T, P] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, appnd must satisfy the law:

appdn(appnd(Monoid.zero, f1), f2) == Monoid.plus(appnd(Monoid.zero, f1), appnd(Monoid.zero, f2))

Otherwise prepare doesn't mean what we expect. If you agree, we should add this law to the comment.

This is addressing one performance issue, which might be a very important one, but also the case of dealing with a bulk set of items (Monoid.sum/Semigroup.sumOption) is also an important optimization.

Since we expose the monoid/semigroup of an Aggregator, scalding and spark use that to get the sumOption speedup. For those cases, they will not get the benefit of an optimized append and in fact will call prepare which may be even slower. So, I think the comment "faster aggregation" is more complex and we need to point out that this could be slower than the standard approach depending on the Monoid and appnd function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing some angle here, but that seems like an opportunity for Scalding/Spark to take better advantage of traversableOnce.aggregate(...) over individual partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikerlandson I think the question is whether the append() optimization is more effective than whatever optimization is in sumOption. They're both about eliminating unnecessary intermediate values of the type we have a Monoid on, but in different ways - it's sorta a question of, for each reduce(left: T, right: T), whether you need the left side to actually be a T (which sumOption eliminates) or the right side to actually be a T (which append eliminates). You could imagine an appendAll which requires neither to be, but that feels even more invasive for the Aggregator to eliminate vs. the AppendMonoid like you proposed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To put it another way, if you ultimately think of this like a def foldLeft[B](z: B)(f: (B, A) => B): B on a Seq[A], and you have some Monoid[T], the question is do we have B == T (a constraint that sumOption gets rid of) or do we have A == T (a constraint that append gets rid of), or both, or neither, and what's the most efficient space to do this aggregation in.

@erikerlandson
Copy link
Contributor Author

The rub seems to be that there are some possibly-incompatible cases. If you have azero, and if append is efficient, then it is advantageous to take advantage of scala's aggregate formulation. But that may not be true, and assuming it can possibly be worse.

My desire to always have my cake and eat it too makes me wonder if there is any advantage to adding some method like aggregateIsFast which defaults to false, and which can be tested by various methods (and in spark and scalding, hopefully) to be smart about how to do aggregations depending on the characteristics of the objects in question.

@erikerlandson
Copy link
Contributor Author

Last night I spent some time trying to work through what I'm "really" after, and how that relates to algebraic concerns and algebird's type system.

Firstly, I've been coming at this from the perspective that I want a principled algebraic way to capture the functionality of Scala's traversableOnce.aggregate(z)(seqop, combop), and leveraging as much efficiency as possible. Starting from that goal, in no particular order:

  1. Right off the bat, algebird's Aggregator is not exactly that, although it can clearly embody that functionality. Aggregator is a type for representing a map-reduce computation, and so (with respect to my particular goals) it comes with some extra baggage. My unconscious tendency of equating it with Scala sequence aggregation is misleading.
  2. Scala's aggregate requires an identity element. If I have a Monoid to work with, then all is well. I also have to provide seqop (aka append), but that does not appear to be the limiting factor in any cases that I've ever seen.
  3. However, it is also desirable to support Semigroups, and so in that case aggregate has to be replaced with reduceLeft, and furthermore there is now a need for some way to instantiate Semigroup objects from data elements. In algebird Aggregator, the prepare (aka "map") phase is drafted into this task.
  4. So we have two not-directly-compatible cases of enhancing "normal" algebraic objects: Monoid + append and Semigroup + prepare
  5. With Algebird Aggregator, the first case was made a subclass of the second, I presume by analogy to the fact that Monoid is a subclass of Semigroup.
  6. However, I'm now wondering if they should actually be sibling classes that inherit from an abstract trait that expresses the idea of "things that can aggregate data elements"
trait ElementAggregator[A, E] {
  def aggregate(data: TraversableOnce[E]): A
}

trait MonoidElementAggregator[M, E] extends ElementAggregator[M, E] {
  def monoid: M
  def append(m: M, e: E): M
}

trait SemigroupElementAggregator[S, E] extends ElementAggregator[S, E] {
  def semigroup: S
  def construct(e: E): S
}

@erikerlandson
Copy link
Contributor Author

From my original blog post, the main cost with prepare/construct for working with the Semigroup case is actually instantiating Semigroup objects for every single data element (in the cases where that is nontrivial). And it isn't avoidable. Working with Monoids versus Semigroups comes with nontrivial advantages, from that point of view. I bring this up because some of Algebird's aggregating objects like CountMinSketch are semigroups. Based on discussions from #495, there was a desire to keep free parameters out of the identity element, which led me to discard the identity element and make t-digest a semigroup. I'm now wondering if that's the best solution.

@avibryant
Copy link
Contributor

I'm not sure I understand why "it isn't avoidable". That only seems to be true if you want to treat every element of your input identically. But why is that necessary? Can't you do the equivalent of list.tail.foldLeft(prepare(list.head))(append) ?

@erikerlandson
Copy link
Contributor Author

@avibryant good point. Using aggregate can still be substantially faster than foldLeft, but it does require specifically invoking some parallelism:

scala>  val data = Vector.fill(1000000) { scala.util.Random.nextInt(100) }
data: scala.collection.immutable.Vector[Int] = Vector(73, 26, 81, 92, 90, 73, ...

scala> val zero = Set.empty[Int]
zero: scala.collection.immutable.Set[Int] = Set()

scala> val plus = (s1: Set[Int], s2: Set[Int]) => s1 ++ s2
plus: (Set[Int], Set[Int]) => scala.collection.immutable.Set[Int] = <function2>

scala> val append = (s: Set[Int], e: Int) => s + e
append: (Set[Int], Int) => scala.collection.immutable.Set[Int] = <function2>

scala> val construct = (e: Int) => Set(e)
construct: Int => scala.collection.immutable.Set[Int] = <function1>

// This is still the worst:
scala> benchmark(10) { data.map(construct).reduceLeft(plus) }
res11: Double = 0.4000747779

// Using one construction followed by foldLeft is quite a bit faster
scala> benchmark(10) { data.tail.foldLeft(construct(data.head))(append) }
res12: Double = 0.040250169200000005

// without parallelism, aggregate performs same as foldLeft above
scala> benchmark(10) { data.aggregate(zero)(append, plus) }
res13: Double = 0.038395296800000006

// with parallelism, aggregate will beat foldLeft:
scala> benchmark(10) { data.par.aggregate(zero)(append, plus) }
res14: Double = 0.011873102699999999

scala> 

@avibryant
Copy link
Contributor

Interesting. I don't have time to do this right now but it seems like you could add a scala.collection.parallel.Task similar to Aggregate but that worked with semigroups? The Aggregate imeplementation, for reference, is here: https://github.com/scala/scala/blob/v2.11.7/src/library/scala/collection/parallel/ParIterableLike.scala#L1005

@erikerlandson
Copy link
Contributor Author

Seeing that the foldLeft formulation of aggregation -- data.tail.foldLeft(construct(data.head))(append) -- is also as fast as the aggregate version (and can work without a monoid identity element), caused me to realize that the AlgebirdRDD aggregate method could be modified to use that and achieve the corresponding speed increase, in the cases with an appropriately optimized appendAll method. And it will not suffer any speed decrease in the default case. I updated the branch with those improvements.

@avibryant
Copy link
Contributor

This looks good. As a slightly separate thing (but that might be worth including in this PR anyway), I think we should have the default appendAll make use of sumOption; that would make it likely that the AlgebirdRDD was making use of an optimized method wherever possible.

@erikerlandson
Copy link
Contributor Author

@avibryant I think appendAll already uses sumOption - it calls reduce, but that defaults to a thin wrapper around sumOption:
https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala#L194

@avibryant
Copy link
Contributor

Aha, you're right, I hadn't realized that. Thanks.

This LGTM, then.

def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] = {
val pr = rdd.mapPartitions({ data =>
if (data.isEmpty) Iterator.empty else {
val sg = agg.prepare(data.next)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename sg to be b here? It is of type b, right? sg sounds like semigroup to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I habitually confuse types with their type classes

appendSemigroup(prep, appnd, identity[T]_)(sg)

/**
* Obtain an [[Aggregator]] that uses an efficient append operation for faster aggregation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not crazy about this documentation leaving out that this is disabling the sumOption optimization, that would have previously been there.

Have you tried a benchmark using a Map[K, V] value or HyperLogLog? Those two have been optimized a fair bit with sumOption (and many others). In some cases, the savings of doing your optimization here will be greater, but it won't neccesarily be faster.

In the HLL case, you might imagine that this would be a win, but there is a an optimized prepare that allocates a sparse object, and then sumOption uses a mutable Array. Here, with append, you could skip the allocation of the sparse HLL object, but you are still going to reallocate a new immutable HLL on each plus operation, and my guess is that will be significantly slower.

We have the benchmarks subproject here, which we try to use when we are submitting performance optimizations. If nothing else we need to add a comment so this does not confuse new users more (basically, when performance is a concern, they have to benchmark).

@avibryant
Copy link
Contributor

This has addressed @johnynek's comments and LGTM, I'm going to merge. Thanks @erikerlandson !

avibryant added a commit that referenced this pull request Dec 4, 2015
Implement an appendMonoid Aggregator factory which yields aggregators…
@avibryant avibryant merged commit 938798e into twitter:develop Dec 4, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants