Skip to content

Commit

Permalink
Refactor from iterator to iterable
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent d052c07 commit 4ed579b
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 107 deletions.
5 changes: 3 additions & 2 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,14 @@ object Bagel extends Logging {
*/
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Iterator[C], Iterator[V]))],
grouped: RDD[(K, (Iterable[C], Iterable[V]))],
compute: (V, Option[C]) => (V, Array[M]),
storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
val processed = grouped.flatMapValues {
val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
.flatMapValues {
case (_, vs) if !vs.hasNext => None
case (c, vs) => {
val (newVert, newMsgs) =
Expand Down
39 changes: 20 additions & 19 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.util.{Comparator, List => JList}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -250,14 +251,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterator[V]] =
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterator[V]] =
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))

/**
Expand Down Expand Up @@ -367,7 +368,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
def groupByKey(): JavaPairRDD[K, JIterator[V]] =
def groupByKey(): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey()))

/**
Expand Down Expand Up @@ -462,55 +463,55 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (JIterator[V], JIterator[W])] =
: JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
partitioner: Partitioner): JavaPairRDD[K, (JIterator[V], JIterator[W1], JIterator[W2])] =
partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterator[V], JIterator[W])] =
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other)))

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JIterator[V], JIterator[W1], JIterator[W2])] =
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (JIterator[V], JIterator[W])] =
: JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
: JavaPairRDD[K, (JIterator[V], JIterator[W1], JIterator[W2])] =
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))

/** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterator[V], JIterator[W])] =
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.groupWith(other)))

/** Alias for cogroup. */
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JIterator[V], JIterator[W1], JIterator[W2])] =
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))

/**
Expand Down Expand Up @@ -695,22 +696,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

object JavaPairRDD {
private[spark]
def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterator[T])]): RDD[(K, JIterator[T])] = {
rddToPairRDDFunctions(rdd).mapValues(asJavaIterator)
def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = {
rddToPairRDDFunctions(rdd).mapValues(asJavaIterable)
}

private[spark]
def cogroupResultToJava[K: ClassTag, V, W](
rdd: RDD[(K, (Iterator[V], Iterator[W]))]): RDD[(K, (JIterator[V], JIterator[W]))] = {
rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterator(x._1), asJavaIterator(x._2)))
rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = {
rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2)))
}

private[spark]
def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
rdd: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))])
: RDD[(K, (JIterator[V], JIterator[W1], JIterator[W2]))] = {
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))])
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = {
rddToPairRDDFunctions(rdd)
.mapValues(x => (asJavaIterator(x._1), asJavaIterator(x._2), asJavaIterator(x._3)))
.mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
}

def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.api.java

import java.util.{Comparator, Iterator => JIterator, List => JList}
import java.util.{Comparator, List => JList}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -204,7 +204,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterator[T]] = {
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
Expand All @@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterator[T]] = {
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
Expand Down
32 changes: 16 additions & 16 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterator[V])] = {
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
Expand All @@ -270,14 +270,14 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
bufs.mapValues(_.toIterator)
bufs.mapValues(_.toIterable)
}

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): RDD[(K, Iterator[V])] = {
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
groupByKey(new HashPartitioner(numPartitions))
}

Expand Down Expand Up @@ -361,7 +361,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
def groupByKey(): RDD[(K, Iterator[V])] = {
def groupByKey(): RDD[(K, Iterable[V])] = {
groupByKey(defaultPartitioner(self))
}

Expand Down Expand Up @@ -457,13 +457,13 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterator[V], Iterator[W]))] = {
: RDD[(K, (Iterable[V], Iterable[W]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]].iterator, ws.asInstanceOf[Seq[W]].iterator)
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
}

Expand All @@ -472,23 +472,23 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))] = {
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]].iterator,
w1s.asInstanceOf[Seq[W1]].iterator,
w2s.asInstanceOf[Seq[W2]].iterator)
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
}
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterator[V], Iterator[W]))] = {
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}

Expand All @@ -497,15 +497,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))] = {
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterator[V], Iterator[W]))] = {
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, new HashPartitioner(numPartitions))
}

Expand All @@ -514,18 +514,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))] = {
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}

/** Alias for cogroup. */
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterator[V], Iterator[W]))] = {
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}

/** Alias for cogroup. */
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Iterator[V], Iterator[W1], Iterator[W2]))] = {
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -438,20 +438,20 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD of grouped items.
*/
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterator[T])] =
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] =
groupBy[K](f, defaultPartitioner(this))

/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterator[T])] =
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] =
groupBy(f, new HashPartitioner(numPartitions))

/**
* Return an RDD of grouped items.
*/
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterator[T])] = {
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
Expand Down
22 changes: 12 additions & 10 deletions examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception {
JavaRDD<String> lines = ctx.textFile(args[1], 1);

// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, Iterator<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = SPACES.split(s);
Expand All @@ -76,9 +76,9 @@ public Tuple2<String, String> call(String s) {
}).distinct().groupByKey().cache();

// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterator<String>, Double>() {
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterable<String>, Double>() {
@Override
public Double call(Iterator<String> rs) {
public Double call(Iterable<String> rs) {
return 1.0;
}
});
Expand All @@ -87,16 +87,18 @@ public Double call(Iterator<String> rs) {
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterator<String>, Double>, String, Double>() {
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
@Override
public Iterable<Tuple2<String, Double>> call(Tuple2<Iterator<String>, Double> s) {
List<String> urls = new ArrayList<String>();
while (s._1.hasNext()) {
urls.add(s._1.next());
public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
int urlCount = 0;
Iterator<String> urls = s._1.iterator();
while (urls.hasNext()) {
urls.next();
urlCount++;
}
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
for (String n : urls) {
results.add(new Tuple2<String, Double>(n, s._2() / urls.size()));
for (String n : s._1) {
results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
}
return results;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ object WikipediaPageRankStandalone {
var ranks = links.mapValues { edges => defaultRank }
for (i <- 1 to numIterations) {
val contribs = links.groupWith(ranks).flatMap {
case (id, (linksWrapper, rankWrapper)) =>
case (id, (linksWrapperIterable, rankWrapperIterable)) =>
val linksWrapper = linksWrapperIterable.iterator
val rankWrapper = rankWrapperIterable.iterator
if (linksWrapper.hasNext) {
val linksWrapperHead = linksWrapper.next
if (rankWrapper.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class ALS private (
* Compute the new feature vectors for a block of the users matrix given the list of factors
* it received from each product and its InLinkBlock.
*/
private def updateBlock(messages: Iterator[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
private def updateBlock(messages: Iterable[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]])
: Array[Array[Double]] =
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ object LAUtils {
case (i, cols) =>
val rowArray = Array.ofDim[Double](n)
var j = 0
while (cols.hasNext) {
val element = cols.next
val colsItr = cols.iterator
while (colsItr.hasNext) {
val element = colsItr.next
rowArray(element._1) = element._2
j += 1
}
Expand Down
Loading

0 comments on commit 4ed579b

Please sign in to comment.