Skip to content

Commit

Permalink
Remove another pattern matching in MappedValuesRDD and revert some ch…
Browse files Browse the repository at this point in the history
…anges in PairRDDFunctions
  • Loading branch information
sryza committed Jul 18, 2014
1 parent be10f8a commit a109828
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
override val partitioner = firstParent[Product2[K, U]].partitioner

override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) }
}
}
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { seq => seq.asInstanceOf[(Seq[V], Seq[W1], Seq[W2], Seq[W3])] }
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]],
w3s.asInstanceOf[Seq[W3]])
}
}

/**
Expand All @@ -584,7 +589,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { pair => pair.asInstanceOf[(Seq[V], Seq[W])] }
cg.mapValues { case Seq(vs, w1s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
}
}

/**
Expand All @@ -597,7 +604,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { seq => seq.asInstanceOf[(Seq[V], Seq[W1], Seq[W2])] }
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
}
}

/**
Expand Down

0 comments on commit a109828

Please sign in to comment.