Skip to content

Commit

Permalink
Back out unrelated Exchange changes, which are slated for a separate …
Browse files Browse the repository at this point in the history
…patch
  • Loading branch information
JoshRosen committed Jul 15, 2015
1 parent 473d814 commit 9d9b092
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,16 @@ case class Exchange(
val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
iter.map(r => (hashExpressions(r).hashCode, r.copy()))
iter.map(r => (hashExpressions(r).copy, r.copy()))
}
} else {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
val mutablePair = new MutablePair[Int, InternalRow]()
iter.map(r => mutablePair.update(hashExpressions(r).hashCode, r))
val mutablePair = new MutablePair[InternalRow, InternalRow]()
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
}
val shuffled = new ShuffledRDD[Int, InternalRow, InternalRow](rdd, part)
val shuffled = new ShuffledRDD[InternalRow, InternalRow, InternalRow](rdd, part)
shuffled.setSerializer(serializer)
shuffled.map(_._2)

Expand Down

0 comments on commit 9d9b092

Please sign in to comment.