Skip to content

Commit

Permalink
[SPARK-2024] refectoring to get method params below 10
Browse files Browse the repository at this point in the history
  • Loading branch information
kanzhang committed Jul 29, 2014
1 parent 57a7a5e commit d998ad6
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 25 deletions.
60 changes: 41 additions & 19 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -612,20 +612,19 @@ private[spark] object PythonRDD extends Logging {
compressionCodecClass: String) = {
saveAsHadoopFile(
pyRDD, batchSerialized, path, "org.apache.hadoop.mapred.SequenceFileOutputFormat",
null, null, null, null, new java.util.HashMap(), compressionCodecClass, false)
null, null, null, null, new java.util.HashMap(), compressionCodecClass)
}

/**
* Output a Python RDD of key-value pairs to any Hadoop file system, using either old
* (mapred package) or new (mapreduce package) Hadoop `OutputFormat`. Keys and values are
* converted to suitable output types using either user specified converters or, if not
* specified, [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
* Output a Python RDD of key-value pairs to any Hadoop file system, using old Hadoop
* `OutputFormat` in mapred package. Keys and values are converted to suitable output
* types using either user specified converters or, if not specified,
* [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
* `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
* `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
* this RDD. Compression codec `codecClass` is only effective with the old format.
* this RDD.
*/
def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], G <: NewOutputFormat[_, _],
C <: CompressionCodec](
def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
path: String,
Expand All @@ -635,22 +634,45 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
compressionCodecClass: String,
useNewAPI: Boolean) = {
compressionCodecClass: String) = {
val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
if (useNewAPI) {
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[G]]
converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
} else {
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
}
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
}

/**
* Output a Python RDD of key-value pairs to any Hadoop file system, using new Hadoop
* `OutputFormat` in mapreduce package. Keys and values are converted to suitable output
* types using either user specified converters or, if not specified,
* [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
* `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
* `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
* this RDD.
*/
def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
path: String,
outputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String]) = {
val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
}

/**
Expand All @@ -665,9 +687,9 @@ private[spark] object PythonRDD extends Logging {
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
confAsMap: java.util.HashMap[String, String],
useNewAPI: Boolean,
keyConverterClass: String,
valueConverterClass: String) = {
valueConverterClass: String,
useNewAPI: Boolean) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized),
keyConverterClass, valueConverterClass, new JavaToWritableConverter)
Expand Down
11 changes: 5 additions & 6 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None
pickled = self._toPickleSerialization()
batched = isinstance(pickled._jrdd_deserializer, BatchedSerializer)
self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickled._jrdd, batched, jconf,
True, keyConverter, valueConverter)
keyConverter, valueConverter, True)

def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
keyConverter=None, valueConverter=None, conf=None):
Expand All @@ -1078,9 +1078,8 @@ def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueCl
jconf = self.ctx._dictToJavaMap(conf)
pickled = self._toPickleSerialization()
batched = isinstance(pickled._jrdd_deserializer, BatchedSerializer)
self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickled._jrdd, batched, path,
outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
jconf, None, True)
self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickled._jrdd, batched, path,
outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)

def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
"""
Expand All @@ -1097,7 +1096,7 @@ def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
pickled = self._toPickleSerialization()
batched = isinstance(pickled._jrdd_deserializer, BatchedSerializer)
self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickled._jrdd, batched, jconf,
False, keyConverter, valueConverter)
keyConverter, valueConverter, False)

def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None):
Expand Down Expand Up @@ -1126,7 +1125,7 @@ def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=No
batched = isinstance(pickled._jrdd_deserializer, BatchedSerializer)
self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickled._jrdd, batched,
path, outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
jconf, compressionCodecClass, False)
jconf, compressionCodecClass)

def saveAsSequenceFile(self, path, compressionCodecClass=None):
"""
Expand Down

0 comments on commit d998ad6

Please sign in to comment.