From e31c8ffca65e0e5cd5f1a6229f3d654a24b7b18c Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 18 Apr 2014 10:01:16 -0700 Subject: [PATCH 1/9] SPARK-1483: Rename minSplits to minPartitions in public APIs https://issues.apache.org/jira/browse/SPARK-1483 From the original JIRA: " The parameter name is part of the public API in Scala and Python, since you can pass named parameters to a method, so we should name it to this more descriptive term. Everywhere else we refer to "splits" as partitions." - @mateiz Author: CodingCat Closes #430 from CodingCat/SPARK-1483 and squashes the following commits: 4b60541 [CodingCat] deprecate defaultMinSplits ba2c663 [CodingCat] Rename minSplits to minPartitions in public APIs --- .../scala/org/apache/spark/SparkContext.scala | 47 ++++++++++--------- .../spark/api/java/JavaSparkContext.scala | 37 +++++++++------ .../input/WholeTextFileInputFormat.scala | 7 +-- .../org/apache/spark/rdd/HadoopRDD.scala | 10 ++-- .../org/apache/spark/rdd/NewHadoopRDD.scala | 4 +- .../org/apache/spark/mllib/util/MLUtils.scala | 12 ++--- python/pyspark/context.py | 6 +-- .../apache/spark/sql/hive/TableReader.scala | 2 +- 8 files changed, 70 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3ddc0d5eeefb8..ee5637371fdca 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -427,9 +427,9 @@ class SparkContext(config: SparkConf) extends Logging { * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ - def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { + def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], - minSplits).map(pair => pair._2.toString) + minPartitions).map(pair => pair._2.toString) } /** @@ -457,9 +457,10 @@ class SparkContext(config: SparkConf) extends Logging { * * @note Small files are preferred, large file is also allowable, but may cause bad performance. * - * @param minSplits A suggestion value of the minimal splitting number for input data. + * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = { + def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, String)] = { val job = new NewHadoopJob(hadoopConfiguration) NewFileInputFormat.addInputPath(job, new Path(path)) val updateConf = job.getConfiguration @@ -469,7 +470,7 @@ class SparkContext(config: SparkConf) extends Logging { classOf[String], classOf[String], updateConf, - minSplits) + minPartitions) } /** @@ -481,7 +482,7 @@ class SparkContext(config: SparkConf) extends Logging { * @param inputFormatClass Class of the InputFormat * @param keyClass Class of the keys * @param valueClass Class of the values - * @param minSplits Minimum number of Hadoop Splits to generate. + * @param minPartitions Minimum number of Hadoop Splits to generate. * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD will create many references to the same object. @@ -493,11 +494,11 @@ class SparkContext(config: SparkConf) extends Logging { inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int = defaultMinSplits + minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { // Add necessary security credentials to the JobConf before broadcasting it. SparkHadoopUtil.get.addCredentials(conf) - new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) + new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat @@ -512,7 +513,7 @@ class SparkContext(config: SparkConf) extends Logging { inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int = defaultMinSplits + minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) @@ -524,7 +525,7 @@ class SparkContext(config: SparkConf) extends Logging { inputFormatClass, keyClass, valueClass, - minSplits) + minPartitions) } /** @@ -532,7 +533,7 @@ class SparkContext(config: SparkConf) extends Logging { * values and the InputFormat so that users don't need to pass them directly. Instead, callers * can just write, for example, * {{{ - * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minPartitions) * }}} * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each @@ -541,13 +542,13 @@ class SparkContext(config: SparkConf) extends Logging { * a `map` function. */ def hadoopFile[K, V, F <: InputFormat[K, V]] - (path: String, minSplits: Int) + (path: String, minPartitions: Int) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { hadoopFile(path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], vm.runtimeClass.asInstanceOf[Class[V]], - minSplits) + minPartitions) } /** @@ -565,7 +566,7 @@ class SparkContext(config: SparkConf) extends Logging { */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = - hadoopFile[K, V, F](path, defaultMinSplits) + hadoopFile[K, V, F](path, defaultMinPartitions) /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] @@ -626,10 +627,10 @@ class SparkContext(config: SparkConf) extends Logging { def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], - minSplits: Int + minPartitions: Int ): RDD[(K, V)] = { val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] - hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) + hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions) } /** Get an RDD for a Hadoop SequenceFile with given key and value types. @@ -641,7 +642,7 @@ class SparkContext(config: SparkConf) extends Logging { * */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V] ): RDD[(K, V)] = - sequenceFile(path, keyClass, valueClass, defaultMinSplits) + sequenceFile(path, keyClass, valueClass, defaultMinPartitions) /** * Version of sequenceFile() for types implicitly convertible to Writables through a @@ -665,7 +666,7 @@ class SparkContext(config: SparkConf) extends Logging { * a `map` function. */ def sequenceFile[K, V] - (path: String, minSplits: Int = defaultMinSplits) + (path: String, minPartitions: Int = defaultMinPartitions) (implicit km: ClassTag[K], vm: ClassTag[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) : RDD[(K, V)] = { @@ -674,7 +675,7 @@ class SparkContext(config: SparkConf) extends Logging { val format = classOf[SequenceFileInputFormat[Writable, Writable]] val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], - vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits) + vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions) writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) } } @@ -688,9 +689,9 @@ class SparkContext(config: SparkConf) extends Logging { */ def objectFile[T: ClassTag]( path: String, - minSplits: Int = defaultMinSplits + minPartitions: Int = defaultMinPartitions ): RDD[T] = { - sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits) + sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes)) } @@ -1183,8 +1184,12 @@ class SparkContext(config: SparkConf) extends Logging { def defaultParallelism: Int = taskScheduler.defaultParallelism /** Default min number of partitions for Hadoop RDDs when not given by user */ + @deprecated("use defaultMinPartitions", "1.0.0") def defaultMinSplits: Int = math.min(defaultParallelism, 2) + /** Default min number of partitions for Hadoop RDDs when not given by user */ + def defaultMinPartitions: Int = math.min(defaultParallelism, 2) + private val nextShuffleId = new AtomicInteger(0) private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index e6a3f06b0ea42..cf30523ab523e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -109,9 +109,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ def defaultParallelism: java.lang.Integer = sc.defaultParallelism - /** Default min number of partitions for Hadoop RDDs when not given by user */ + /** + * Default min number of partitions for Hadoop RDDs when not given by user. + * @deprecated As of Spark 1.0.0, defaultMinSplits is deprecated, use + * {@link #defaultMinPartitions()} instead + */ + @Deprecated def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits + /** Default min number of partitions for Hadoop RDDs when not given by user */ + def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions + /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag @@ -153,7 +161,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ - def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits) + def textFile(path: String, minPartitions: Int): JavaRDD[String] = + sc.textFile(path, minPartitions) /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any @@ -180,17 +189,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * * @note Small files are preferred, large file is also allowable, but may cause bad performance. * - * @param minSplits A suggestion value of the minimal splitting number for input data. + * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] = - new JavaPairRDD(sc.wholeTextFiles(path, minSplits)) + def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] = + new JavaPairRDD(sc.wholeTextFiles(path, minPartitions)) /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a * key-value pair, where the key is the path of each file, the value is the content of each file. * - * @see `wholeTextFiles(path: String, minSplits: Int)`. + * @see `wholeTextFiles(path: String, minPartitions: Int)`. */ def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) @@ -205,11 +214,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], - minSplits: Int + minPartitions: Int ): JavaPairRDD[K, V] = { implicit val ctagK: ClassTag[K] = ClassTag(keyClass) implicit val ctagV: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits)) + new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions)) } /** Get an RDD for a Hadoop SequenceFile. @@ -233,9 +242,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * slow if you use the default serializer (Java serialization), though the nice thing about it is * that there's very little effort required to save arbitrary objects. */ - def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = { + def objectFile[T](path: String, minPartitions: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag - sc.objectFile(path, minSplits)(ctag) + sc.objectFile(path, minPartitions)(ctag) } /** @@ -265,11 +274,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork inputFormatClass: Class[F], keyClass: Class[K], valueClass: Class[V], - minSplits: Int + minPartitions: Int ): JavaPairRDD[K, V] = { implicit val ctagK: ClassTag[K] = ClassTag(keyClass) implicit val ctagV: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits)) + new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions)) } /** @@ -304,11 +313,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork inputFormatClass: Class[F], keyClass: Class[K], valueClass: Class[V], - minSplits: Int + minPartitions: Int ): JavaPairRDD[K, V] = { implicit val ctagK: ClassTag[K] = ClassTag(keyClass) implicit val ctagV: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)) + new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 80d055a89573b..4cb450577796a 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -48,14 +48,15 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str } /** - * Allow minSplits set by end-user in order to keep compatibility with old Hadoop API. + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. */ - def setMaxSplitSize(context: JobContext, minSplits: Int) { + def setMaxSplitSize(context: JobContext, minPartitions: Int) { val files = listStatus(context) val totalLen = files.map { file => if (file.isDir) 0L else file.getLen }.sum - val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong + val maxSplitSize = Math.ceil(totalLen * 1.0 / + (if (minPartitions == 0) 1 else minPartitions)).toLong super.setMaxSplitSize(maxSplitSize) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6811e1abb8b70..6547755764dcf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -87,7 +87,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. - * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate. + * @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate. */ @DeveloperApi class HadoopRDD[K, V]( @@ -97,7 +97,7 @@ class HadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int) + minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging { def this( @@ -106,7 +106,7 @@ class HadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int) = { + minPartitions: Int) = { this( sc, sc.broadcast(new SerializableWritable(conf)) @@ -115,7 +115,7 @@ class HadoopRDD[K, V]( inputFormatClass, keyClass, valueClass, - minSplits) + minPartitions) } protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) @@ -169,7 +169,7 @@ class HadoopRDD[K, V]( if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf) } - val inputSplits = inputFormat.getSplits(jobConf, minSplits) + val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 8684b645bc361..ac1ccc06f238a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -158,7 +158,7 @@ private[spark] class WholeTextFileRDD( keyClass: Class[String], valueClass: Class[String], @transient conf: Configuration, - minSplits: Int) + minPartitions: Int) extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { @@ -169,7 +169,7 @@ private[spark] class WholeTextFileRDD( case _ => } val jobContext = newJobContext(conf, jobId) - inputFormat.setMaxSplitSize(jobContext, minSplits) + inputFormat.setMaxSplitSize(jobContext, minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 2f3ac10397515..3d6e7e0d5c953 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -57,7 +57,7 @@ object MLUtils { * @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 otherwise * @param numFeatures number of features, which will be determined from the input data if a * negative value is given. The default value is -1. - * @param minSplits min number of partitions, default: sc.defaultMinSplits + * @param minPartitions min number of partitions, default: sc.defaultMinPartitions * @return labeled data stored as an RDD[LabeledPoint] */ def loadLibSVMData( @@ -65,8 +65,8 @@ object MLUtils { path: String, labelParser: LabelParser, numFeatures: Int, - minSplits: Int): RDD[LabeledPoint] = { - val parsed = sc.textFile(path, minSplits) + minPartitions: Int): RDD[LabeledPoint] = { + val parsed = sc.textFile(path, minPartitions) .map(_.trim) .filter(!_.isEmpty) .map(_.split(' ')) @@ -101,7 +101,7 @@ object MLUtils { * with number of features determined automatically and the default number of partitions. */ def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] = - loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinSplits) + loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinPartitions) /** * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint], @@ -112,7 +112,7 @@ object MLUtils { sc: SparkContext, path: String, labelParser: LabelParser): RDD[LabeledPoint] = - loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinSplits) + loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinPartitions) /** * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint], @@ -124,7 +124,7 @@ object MLUtils { path: String, labelParser: LabelParser, numFeatures: Int): RDD[LabeledPoint] = - loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinSplits) + loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinPartitions) /** * :: Experimental :: diff --git a/python/pyspark/context.py b/python/pyspark/context.py index d8667e84fedff..f63cc4a55fb98 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -248,14 +248,14 @@ def parallelize(self, c, numSlices=None): jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) return RDD(jrdd, self, serializer) - def textFile(self, name, minSplits=None): + def textFile(self, name, minPartitions=None): """ Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. """ - minSplits = minSplits or min(self.defaultParallelism, 2) - return RDD(self._jsc.textFile(name, minSplits), self, + minPartitions = minPartitions or min(self.defaultParallelism, 2) + return RDD(self._jsc.textFile(name, minPartitions), self, UTF8Deserializer()) def wholeTextFiles(self, path): diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 0da5eb754cb3f..8cfde46186ca4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -52,7 +52,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon // Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless // it is smaller than what Spark suggests. private val _minSplitsPerRDD = math.max( - sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinSplits) + sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions) // TODO: set aws s3 credentials. From 89f47434e2a6c2f8b80c44d08f866d3a8b8e85c3 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 18 Apr 2014 10:02:27 -0700 Subject: [PATCH 2/9] Reuses Row object in ExistingRdd.productToRowRdd() Author: Cheng Lian Closes #432 from liancheng/reuseRow and squashes the following commits: 9e6d083 [Cheng Lian] Simplified code with BufferedIterator 52acec9 [Cheng Lian] Reuses Row object in ExistingRdd.productToRowRdd() --- .../spark/sql/execution/basicOperators.scala | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index ab2e62463764a..eedcc7dda02d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution} import org.apache.spark.util.MutablePair - case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode { override def output = projectList.map(_.toAttribute) @@ -143,8 +142,24 @@ object ExistingRdd { } def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = { - // TODO: Reuse the row, don't use map on the product iterator. Maybe code gen? - data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row) + data.mapPartitions { iterator => + if (iterator.isEmpty) { + Iterator.empty + } else { + val bufferedIterator = iterator.buffered + val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity) + + bufferedIterator.map { r => + var i = 0 + while (i < mutableRow.length) { + mutableRow(i) = r.productElement(i) + i += 1 + } + + mutableRow + } + } + } } def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = { From aa17f022c59af02b04b977da9017671ef14d664a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 18 Apr 2014 10:03:15 -0700 Subject: [PATCH 3/9] [SPARK-1520] remove fastutil from dependencies A quick fix for https://issues.apache.org/jira/browse/SPARK-1520 By excluding fastutil, we bring the number of files in the assembly jar back under 65536, so Java 7 won't create the assembly jar in zip64 format, which cannot be read by Java 6. With this change, the assembly jar now has about 60000 entries (58000 files), tested with both sbt and maven. Author: Xiangrui Meng Closes #437 from mengxr/remove-fastutil and squashes the following commits: 00f9beb [Xiangrui Meng] remove fastutil from dependencies --- pom.xml | 7 +++++++ project/SparkBuild.scala | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index cd204376de5db..4ff18afa227be 100644 --- a/pom.xml +++ b/pom.xml @@ -263,6 +263,13 @@ com.clearspring.analytics stream 2.5.1 + + + + it.unimi.dsi + fastutil + +