From 3ddc8bbdcdacfe9fee9a9a8801c2da62250cb469 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 14 Feb 2014 13:11:04 +0530 Subject: [PATCH 1/3] Deprected *With functions in scala and added a few missing Java APIs --- .../scala/org/apache/spark/SparkContext.scala | 3 ++- .../org/apache/spark/api/java/JavaRDD.scala | 2 ++ .../spark/api/java/JavaSparkContext.scala | 18 ++++++++++++++++++ .../main/scala/org/apache/spark/rdd/RDD.scala | 12 ++++++++---- 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a24f07e9a6e9a..1f5334f3dbb40 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -240,6 +240,7 @@ class SparkContext( localProperties.set(props) } + @deprecated("Properties no longer need to be explicitly initialized.", "1.0.0") def initLocalProperties() { localProperties.set(new Properties()) } @@ -308,7 +309,7 @@ class SparkContext( private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) - def initDriverMetrics() { + private def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) SparkEnv.get.metricsSystem.registerSource(blockManagerSource) } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 0055c98844ded..816157b0cd85e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -126,6 +126,8 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) + def generator = rdd.generator + override def toString = rdd.toString /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index dc26b7f621fee..084ec5a8e319b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -92,6 +92,24 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork private[spark] val env = sc.env + def isLocal = sc.isLocal + + def sparkUser = sc.sparkUser + + def master = sc.master + + def appName = sc.appName + + def jars = JavaConversions.seqAsJavaList(sc.jars) + + def startTime = sc.startTime + + /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ + def defaultParallelism = sc.defaultParallelism + + /** Default min number of partitions for Hadoop RDDs when not given by user */ + def defaultMinSplits = sc.defaultMinSplits + /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 50320f40350cd..3fe56963e0008 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -543,7 +543,8 @@ abstract class RDD[T: ClassTag]( * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def mapWith[A: ClassTag, U: ClassTag] + @deprecated("use mapPartitionsWithIndex", "1.0.0") + def mapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => U): RDD[U] = { mapPartitionsWithIndex((index, iter) => { @@ -557,7 +558,8 @@ abstract class RDD[T: ClassTag]( * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def flatMapWith[A: ClassTag, U: ClassTag] + @deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0") + def flatMapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => Seq[U]): RDD[U] = { mapPartitionsWithIndex((index, iter) => { @@ -571,7 +573,8 @@ abstract class RDD[T: ClassTag]( * This additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) { + @deprecated("use mapPartitionsWithIndex and foreach", "1.0.0") + def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit) { mapPartitionsWithIndex { (index, iter) => val a = constructA(index) iter.map(t => {f(t, a); t}) @@ -583,7 +586,8 @@ abstract class RDD[T: ClassTag]( * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = { + @deprecated("use mapPartitionsWithIndex and filter", "1.0.0") + def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.filter(t => p(t, a)) From 737819a16627517c6d6f42a8c681e434a685f292 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Mon, 17 Feb 2014 10:39:40 +0530 Subject: [PATCH 2/3] SPARK-1095 add explicit return types to APIs. --- .../org/apache/spark/api/java/JavaRDD.scala | 2 +- .../org/apache/spark/api/java/JavaRDDLike.scala | 2 +- .../spark/api/java/JavaSparkContext.scala | 17 +++++++++-------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 816157b0cd85e..d7ce8fdfc23f4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -126,7 +126,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) - def generator = rdd.generator + def generator: String = rdd.generator override def toString = rdd.toString diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 24a9925dbd22c..a3157b5bfb472 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -74,7 +74,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * of the original partition. */ def mapPartitionsWithIndex[R: ClassTag]( - f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]], + f: JFunction2[Integer, java.util.Iterator[T], java.util.Iterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), preservesPartitioning)) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 084ec5a8e319b..916259191baa8 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -17,6 +17,7 @@ package org.apache.spark.api.java +import java.util import java.util.{Map => JMap} import scala.collection.JavaConversions @@ -92,23 +93,23 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork private[spark] val env = sc.env - def isLocal = sc.isLocal + def isLocal: java.lang.Boolean = sc.isLocal - def sparkUser = sc.sparkUser + def sparkUser: String = sc.sparkUser - def master = sc.master + def master: String = sc.master - def appName = sc.appName + def appName: String = sc.appName - def jars = JavaConversions.seqAsJavaList(sc.jars) + def jars: util.List[String] = sc.jars - def startTime = sc.startTime + def startTime: java.lang.Long = sc.startTime /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ - def defaultParallelism = sc.defaultParallelism + def defaultParallelism: Integer = sc.defaultParallelism /** Default min number of partitions for Hadoop RDDs when not given by user */ - def defaultMinSplits = sc.defaultMinSplits + def defaultMinSplits: Integer = sc.defaultMinSplits /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { From 11d0c2b66b35614d8730c96033eccc67d5eeb466 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 21 Feb 2014 12:06:05 +0530 Subject: [PATCH 3/3] Integer -> java.lang.Integer --- .../main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 2 +- .../scala/org/apache/spark/api/java/JavaSparkContext.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index a3157b5bfb472..729668fb679b4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -74,7 +74,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * of the original partition. */ def mapPartitionsWithIndex[R: ClassTag]( - f: JFunction2[Integer, java.util.Iterator[T], java.util.Iterator[R]], + f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), preservesPartitioning)) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 916259191baa8..8e0eab56a3dcf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -106,10 +106,10 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def startTime: java.lang.Long = sc.startTime /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ - def defaultParallelism: Integer = sc.defaultParallelism + def defaultParallelism: java.lang.Integer = sc.defaultParallelism /** Default min number of partitions for Hadoop RDDs when not given by user */ - def defaultMinSplits: Integer = sc.defaultMinSplits + def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {