Skip to content

Commit

Permalink
Fix ShuffledRDD sometimes not returning Tuple2s.
Browse files Browse the repository at this point in the history
This fix isn't great because we sometimes allocate another Tuple2, but
the only other easy option is to disable sending mutable pairs, which
would be expensive.
  • Loading branch information
mateiz committed Jul 30, 2014
1 parent f617432 commit 62c56c8
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ case class Aggregator[K, V, C] (
}

@deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] =
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : Iterator[(K, C)] =
combineCombinersByKey(iter, null)

def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
: Iterator[(K, C)] =
{
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ private[spark] class HashShuffleReader[K, C](
} else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
} else {
iter
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}

// Sort the output if there is a sort ordering defined.
Expand Down

0 comments on commit 62c56c8

Please sign in to comment.