From 669818600c0f9b6a7d5fbb4f645ed55e6a21634d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 25 Mar 2014 17:22:20 -0700 Subject: [PATCH] Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy" This reverts commit df9afbec7e9fb558cf75d4e8dc94d8f44f101301. --- .../org/apache/spark/rdd/CoGroupedRDD.scala | 27 +++++++------------ .../collection/ExternalAppendOnlyMap.scala | 24 +++++------------ 2 files changed, 17 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 6f22486465a8b..9aa454a5c8b88 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext} import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} -import org.apache.spark.util.collection.{FlexibleExternalAppendOnlyMap, AppendOnlyMap} +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap} import org.apache.spark.serializer.Serializer private[spark] sealed trait CoGroupSplitDep extends Serializable @@ -58,14 +58,14 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep] * @param part partitioner used to partition the shuffle output. */ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) - extends RDD[(K, Seq[Iterator[_]])](rdds.head.context, Nil) { + extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { // For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs). // Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner. // CoGroupValue is the intermediate state of each value before being merged in compute. private type CoGroup = ArrayBuffer[Any] private type CoGroupValue = (Any, Int) // Int is dependency number - private type CoGroupCombiner = Array[CoGroup] + private type CoGroupCombiner = Seq[CoGroup] private var serializer: Serializer = null @@ -105,7 +105,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner: Some[Partitioner] = Some(part) - override def compute(s: Partition, context: TaskContext): Iterator[(K, Iterator[CoGroup])] = { + override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { val sparkConf = SparkEnv.get.conf val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) val split = s.asInstanceOf[CoGroupPartition] @@ -141,12 +141,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: getCombiner(kv._1)(depNum) += kv._2 } } - // Convert to iterators - val finalMap = new AppendOnlyMap[K, Iterator[CoGroup]](math.max(map.size, 64)) - map.foreach { case (it, k) => - finalMap.update(it, k.iterator) - } - new InterruptibleIterator(context, finalMap.iterator) + new InterruptibleIterator(context, map.iterator) } else { val map = createExternalMap(numRdds) rddIterators.foreach { case (it, depNum) => @@ -162,7 +157,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: } private def createExternalMap(numRdds: Int) - : FlexibleExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner, Iterator[CoGroup]] = { + : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = { val createCombiner: (CoGroupValue => CoGroupCombiner) = value => { val newCombiner = Array.fill(numRdds)(new CoGroup) @@ -174,14 +169,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: value match { case (v, depNum) => combiner(depNum) += v } combiner } - val mergeCombiners: (CoGroupCombiner, Iterator[CoGroup]) => Iterator[CoGroup] = + val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = (combiner1, combiner2) => { - combiner1.toIterator.zip(combiner2).map { case (v1, v2) => v1 ++ v2 } + combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 } } - val returnCombiner: (CoGroupCombiner) => Iterator[CoGroup] = - (combiner) => combiner.toIterator - new FlexibleExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner, Iterator[CoGroup]]( - createCombiner, mergeValue, mergeCombiners, returnCombiner) + new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner]( + createCombiner, mergeValue, mergeCombiners) } override def clearDependencies() { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 868351b5a2458..caa06d5b445b4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -55,25 +55,16 @@ import org.apache.spark.storage.{BlockId, BlockManager} * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of * this threshold, in case map size estimation is not sufficiently accurate. */ + private[spark] class ExternalAppendOnlyMap[K, V, C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, serializer: Serializer = SparkEnv.get.serializer, blockManager: BlockManager = SparkEnv.get.blockManager) - extends FlexibleExternalAppendOnlyMap[K, V, C, C](createCombiner, mergeValue, mergeCombiners, (x => x), - serializer, blockManager) { -} -private[spark] class FlexibleExternalAppendOnlyMap[K, V, C, T]( - createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, T) => T, - returnCombiner: C => T, - serializer: Serializer = SparkEnv.get.serializer, - blockManager: BlockManager = SparkEnv.get.blockManager) - extends Iterable[(K, T)] with Serializable with Logging { + extends Iterable[(K, C)] with Serializable with Logging { - import FlexibleExternalAppendOnlyMap._ + import ExternalAppendOnlyMap._ private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] @@ -272,13 +263,13 @@ private[spark] class FlexibleExternalAppendOnlyMap[K, V, C, T]( * If the given buffer contains a value for the given key, merge that value into * baseCombiner and remove the corresponding (K, C) pair from the buffer. */ - private def mergeIfKeyExists(key: K, baseCombiner: T, buffer: StreamBuffer): T = { + private def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = { var i = 0 while (i < buffer.pairs.length) { val (k, c) = buffer.pairs(i) if (k == key) { buffer.pairs.remove(i) - return mergeCombiners(c, baseCombiner) + return mergeCombiners(baseCombiner, c) } i += 1 } @@ -301,8 +292,7 @@ private[spark] class FlexibleExternalAppendOnlyMap[K, V, C, T]( // Select a key from the StreamBuffer that holds the lowest key hash val minBuffer = mergeHeap.dequeue() val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) - var (minKey, minCombinerC) = minPairs.remove(0) - var minCombiner = returnCombiner(minCombinerC) + var (minKey, minCombiner) = minPairs.remove(0) assert(minKey.hashCode() == minHash) // For all other streams that may have this key (i.e. have the same minimum key hash), @@ -428,7 +418,7 @@ private[spark] class FlexibleExternalAppendOnlyMap[K, V, C, T]( } } -private[spark] object FlexibleExternalAppendOnlyMap { +private[spark] object ExternalAppendOnlyMap { private class KCComparator[K, C] extends Comparator[(K, C)] { def compare(kc1: (K, C), kc2: (K, C)): Int = { kc1._1.hashCode().compareTo(kc2._1.hashCode())