From 95d59d697a7f2fabd877dac864e9dff8d1011d5a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 11 Dec 2014 10:42:02 +0800 Subject: [PATCH 1/2] Add 'iterator' to reduce memory consumed by join --- .../org/apache/spark/rdd/PairRDDFunctions.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index c43e1f2fe135e..fddd690c71be9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -480,7 +480,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues( pair => - for (v <- pair._1; w <- pair._2) yield (v, w) + for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) } @@ -493,9 +493,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { - pair._1.map(v => (v, None)) + pair._1.iterator.map(v => (v, None): (V, Option[W])) } else { - for (v <- pair._1; w <- pair._2) yield (v, Some(w)) + for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w)) } } } @@ -510,9 +510,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._1.isEmpty) { - pair._2.map(w => (None, w)) + pair._2.iterator.map(w => (None, w): (Option[V], W)) } else { - for (v <- pair._1; w <- pair._2) yield (Some(v), w) + for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w) } } } @@ -528,9 +528,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { - case (vs, Seq()) => vs.map(v => (Some(v), None)) - case (Seq(), ws) => ws.map(w => (None, Some(w))) - case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w)) + case (vs, Seq()) => vs.iterator.map(v => (Some(v), None): (Option[V], Option[W])) + case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)): (Option[V], Option[W])) + case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w)) } } From 48ee7b9c9aee147f0dc5bfec85a7ce4ea1319c78 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 12 Dec 2014 21:20:40 +0800 Subject: [PATCH 2/2] Remove the explicit types --- .../scala/org/apache/spark/rdd/PairRDDFunctions.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index fddd690c71be9..4a09931779b17 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -493,7 +493,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { - pair._1.iterator.map(v => (v, None): (V, Option[W])) + pair._1.iterator.map(v => (v, None)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w)) } @@ -510,7 +510,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._1.isEmpty) { - pair._2.iterator.map(w => (None, w): (Option[V], W)) + pair._2.iterator.map(w => (None, w)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w) } @@ -528,8 +528,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { - case (vs, Seq()) => vs.iterator.map(v => (Some(v), None): (Option[V], Option[W])) - case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)): (Option[V], Option[W])) + case (vs, Seq()) => vs.iterator.map(v => (Some(v), None)) + case (Seq(), ws) => ws.iterator.map(w => (None, Some(w))) case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w)) } }