From 531d9d75762bf04ebb5580f42ce24b178d721686 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 25 Jan 2014 16:32:44 -0800 Subject: [PATCH 1/2] Increase JUnit test verbosity under SBT. Upgrade junit-interface plugin from 0.9 to 0.10. I noticed that the JavaAPISuite tests didn't appear to display any output locally or under Jenkins, making it difficult to know whether they were running. This change increases the verbosity to more closely match the ScalaTest tests. --- pom.xml | 2 +- project/SparkBuild.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 54072b053cb5e..1ac8f0fa079e0 100644 --- a/pom.xml +++ b/pom.xml @@ -389,7 +389,7 @@ com.novocode junit-interface - 0.9 + 0.10 test diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 46a1c640e30ae..e33f230188fc7 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -212,12 +212,13 @@ object SparkBuild extends Build { "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"), "org.scalatest" %% "scalatest" % "1.9.1" % "test", "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", - "com.novocode" % "junit-interface" % "0.9" % "test", + "com.novocode" % "junit-interface" % "0.10" % "test", "org.easymock" % "easymock" % "3.1" % "test", "org.mockito" % "mockito-all" % "1.8.5" % "test", "commons-io" % "commons-io" % "2.4" % "test" ), + testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), parallelExecution := true, /* Workaround for issue #206 (fixed after SBT 0.11.0) */ watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task, From 740e865f40704dc9158a6cf635990580fb6adcac Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 25 Jan 2014 16:39:20 -0800 Subject: [PATCH 2/2] Fix ClassCastException in JavaPairRDD.collectAsMap() (SPARK-1040) This fixes an issue where collectAsMap() could fail when called on a JavaPairRDD that was derived by transforming a non-JavaPairRDD. The root problem was that we were creating the JavaPairRDD's ClassTag by casting a ClassTag[AnyRef] to a ClassTag[Tuple2[K2, V2]]. To fix this, I cast a ClassTag[Tuple2[_, _]] instead, since this actually produces a ClassTag of the appropriate type because ClassTags don't capture type parameters: scala> implicitly[ClassTag[Tuple2[_, _]]] == implicitly[ClassTag[Tuple2[Int, Int]]] res8: Boolean = true scala> implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[Int, Int]]] == implicitly[ClassTag[Tuple2[Int, Int]]] res9: Boolean = false --- .../org/apache/spark/api/java/JavaRDDLike.scala | 4 ++-- .../scala/org/apache/spark/JavaAPISuite.java | 17 +++++++++++++++++ .../streaming/api/java/JavaDStreamLike.scala | 4 ++-- .../streaming/api/java/JavaPairDStream.scala | 2 +- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 9680c6f3e1475..4db7339e6716b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -88,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to all elements of this RDD. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType()) } @@ -119,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType()) } diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 23ec6c3b311f0..8c573ac0d65e0 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -387,18 +387,21 @@ public Double call(Integer x) { return 1.0 * x; } }).cache(); + doubles.collect(); JavaPairRDD pairs = rdd.map(new PairFunction() { @Override public Tuple2 call(Integer x) { return new Tuple2(x, x); } }).cache(); + pairs.collect(); JavaRDD strings = rdd.map(new Function() { @Override public String call(Integer x) { return x.toString(); } }).cache(); + strings.collect(); } @Test @@ -962,4 +965,18 @@ public void countApproxDistinctByKey() { } } + + @Test + public void collectAsMapWithIntArrayValues() { + // Regression test for SPARK-1040 + JavaRDD rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 })); + JavaPairRDD pairRDD = rdd.map(new PairFunction() { + @Override + public Tuple2 call(Integer x) throws Exception { + return new Tuple2(x, new int[] { x }); + } + }); + pairRDD.collect(); // Works fine + Map map = pairRDD.collectAsMap(); // Used to crash with ClassCastException + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index a493a8279f942..64fe204cdf7a5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -138,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** Return a new DStream by applying a function to all elements of this DStream. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } @@ -159,7 +159,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 79fa6a623d290..62cfa0a229db1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -745,7 +745,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } override val classTag: ClassTag[(K, V)] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]] + implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]] } object JavaPairDStream {