From 98ab4112255d4e0fdb6e084bd3fe65807c5b209b Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Sun, 20 Jul 2014 01:24:32 -0700 Subject: [PATCH] SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical section... ...s of CoGroupedRDD and PairRDDFunctions This also removes an unnecessary tuple creation in cogroup. Author: Sandy Ryza Closes #1447 from sryza/sandy-spark-2519-2 and squashes the following commits: b6d9699 [Sandy Ryza] Remove missed Tuple2 match in CoGroupedRDD a109828 [Sandy Ryza] Remove another pattern matching in MappedValuesRDD and revert some changes in PairRDDFunctions be10f8a [Sandy Ryza] SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical sections of CoGroupedRDD and PairRDDFunctions --- .../org/apache/spark/rdd/CoGroupedRDD.scala | 4 +- .../apache/spark/rdd/MappedValuesRDD.scala | 2 +- .../apache/spark/rdd/PairRDDFunctions.scala | 60 +++++++++---------- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 5366c1a1cc1bd..aca235a62a6a8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -170,12 +170,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: val createCombiner: (CoGroupValue => CoGroupCombiner) = value => { val newCombiner = Array.fill(numRdds)(new CoGroup) - value match { case (v, depNum) => newCombiner(depNum) += v } + newCombiner(value._2) += value._1 newCombiner } val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = (combiner, value) => { - value match { case (v, depNum) => combiner(depNum) += v } + combiner(value._2) += value._1 combiner } val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala index 2bc47eb9fcd74..a60952eee5901 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala @@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U) override val partitioner = firstParent[Product2[K, U]].partitioner override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = { - firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) } + firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 29038b0359ccd..a6b920467283e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val reducePartition = (iter: Iterator[(K, V)]) => { val map = new JHashMap[K, V] - iter.foreach { case (k, v) => - val old = map.get(k) - map.put(k, if (old == null) v else func(old, v)) + iter.foreach { pair => + val old = map.get(pair._1) + map.put(pair._1, if (old == null) pair._2 else func(old, pair._2)) } Iterator(map) } : Iterator[JHashMap[K, V]] val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => { - m2.foreach { case (k, v) => - val old = m1.get(k) - m1.put(k, if (old == null) v else func(old, v)) + m2.foreach { pair => + val old = m1.get(pair._1) + m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2)) } m1 } : JHashMap[K, V] @@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { - this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => - for (v <- vs; w <- ws) yield (v, w) - } + this.cogroup(other, partitioner).flatMapValues( pair => + for (v <- pair._1; w <- pair._2) yield (v, w) + ) } /** @@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * partition the output RDD. */ def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { - this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => - if (ws.isEmpty) { - vs.map(v => (v, None)) + this.cogroup(other, partitioner).flatMapValues { pair => + if (pair._2.isEmpty) { + pair._1.map(v => (v, None)) } else { - for (v <- vs; w <- ws) yield (v, Some(w)) + for (v <- pair._1; w <- pair._2) yield (v, Some(w)) } } } @@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = { - this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => - if (vs.isEmpty) { - ws.map(w => (None, w)) + this.cogroup(other, partitioner).flatMapValues { pair => + if (pair._1.isEmpty) { + pair._2.map(w => (None, w)) } else { - for (v <- vs; w <- ws) yield (Some(v), w) + for (v <- pair._1; w <- pair._2) yield (Some(v), w) } } } @@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val data = self.collect() val map = new mutable.HashMap[K, V] map.sizeHint(data.length) - data.foreach { case (k, v) => map.put(k, v) } + data.foreach { pair => map.put(pair._1, pair._2) } map } @@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner) cg.mapValues { case Seq(vs, w1s, w2s, w3s) => - (vs.asInstanceOf[Seq[V]], - w1s.asInstanceOf[Seq[W1]], - w2s.asInstanceOf[Seq[W2]], - w3s.asInstanceOf[Seq[W3]]) + (vs.asInstanceOf[Seq[V]], + w1s.asInstanceOf[Seq[W1]], + w2s.asInstanceOf[Seq[W2]], + w3s.asInstanceOf[Seq[W3]]) } } @@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) - cg.mapValues { case Seq(vs, ws) => - (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) + cg.mapValues { case Seq(vs, w1s) => + (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]]) } } @@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) cg.mapValues { case Seq(vs, w1s, w2s) => (vs.asInstanceOf[Seq[V]], - w1s.asInstanceOf[Seq[W1]], - w2s.asInstanceOf[Seq[W2]]) + w1s.asInstanceOf[Seq[W1]], + w2s.asInstanceOf[Seq[W2]]) } } @@ -712,8 +712,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val index = p.getPartition(key) val process = (it: Iterator[(K, V)]) => { val buf = new ArrayBuffer[V] - for ((k, v) <- it if k == key) { - buf += v + for (pair <- it if pair._1 == key) { + buf += pair._2 } buf } : Seq[V] @@ -858,8 +858,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] try { while (iter.hasNext) { - val (k, v) = iter.next() - writer.write(k, v) + val pair = iter.next() + writer.write(pair._1, pair._2) } } finally { writer.close(hadoopContext)