Skip to content

Commit

Permalink
Merge pull request apache#511 from JoshRosen/SPARK-1040
Browse files Browse the repository at this point in the history
Fix ClassCastException in JavaPairRDD.collectAsMap() (SPARK-1040)

This fixes [SPARK-1040](https://spark-project.atlassian.net/browse/SPARK-1040), an issue where JavaPairRDD.collectAsMap() could sometimes fail with ClassCastException.  I applied the same fix to the Spark Streaming Java APIs.  The commit message describes the fix in more detail.

I also increased the verbosity of JUnit test output under SBT to make it easier to verify that the Java tests are actually running.
  • Loading branch information
rxin committed Jan 26, 2014
2 parents 05be704 + 740e865 commit c66a2ef
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
}

Expand Down Expand Up @@ -119,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
}

Expand Down
17 changes: 17 additions & 0 deletions core/src/test/scala/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,18 +387,21 @@ public Double call(Integer x) {
return 1.0 * x;
}
}).cache();
doubles.collect();
JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer x) {
return new Tuple2<Integer, Integer>(x, x);
}
}).cache();
pairs.collect();
JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
@Override
public String call(Integer x) {
return x.toString();
}
}).cache();
strings.collect();
}

@Test
Expand Down Expand Up @@ -962,4 +965,18 @@ public void countApproxDistinctByKey() {
}

}

@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
@Override
public Tuple2<Integer, int[]> call(Integer x) throws Exception {
return new Tuple2<Integer, int[]>(x, new int[] { x });
}
});
pairRDD.collect(); // Works fine
Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
<version>0.9</version>
<version>0.10</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
3 changes: 2 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,13 @@ object SparkBuild extends Build {
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.9" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymock" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.8.5" % "test",
"commons-io" % "commons-io" % "2.4" % "test"
),

testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
parallelExecution := true,
/* Workaround for issue #206 (fixed after SBT 0.11.0) */
watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T

/** Return a new DStream by applying a function to all elements of this DStream. */
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}

Expand All @@ -159,7 +159,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}

override val classTag: ClassTag[(K, V)] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
}

object JavaPairDStream {
Expand Down

0 comments on commit c66a2ef

Please sign in to comment.