diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 452794d8e6379..4fe5800df917e 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -23,15 +23,13 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io._ -/** - * Utilities for working with Python objects -> Hadoop-related objects - */ +/** Utilities for working with Python objects -> Hadoop-related objects */ private[python] object PythonHadoopUtil { /** - * Convert a Map of properties to a [[org.apache.hadoop.conf.Configuration]] + * Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]] */ - def mapToConf(map: java.util.Map[String, String]) = { + def mapToConf(map: java.util.Map[String, String]): Configuration = { import collection.JavaConversions._ val conf = new Configuration() map.foreach{ case (k, v) => conf.set(k, v) } @@ -42,7 +40,7 @@ private[python] object PythonHadoopUtil { * Merges two configurations, returns a copy of left with keys from right overwriting * any matching keys in left */ - def mergeConfs(left: Configuration, right: Configuration) = { + def mergeConfs(left: Configuration, right: Configuration): Configuration = { import collection.JavaConversions._ val copy = new Configuration(left) right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue)) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index ed265008a8c2b..4585435c21313 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -24,11 +24,8 @@ import scala.util.Success import scala.util.Failure import net.razorvine.pickle.Pickler -/** - * Utilities for serialization / deserialization between Python and Java, using MsgPack. - * Also contains utilities for converting [[org.apache.hadoop.io.Writable]] -> - * Scala objects and primitives - */ + +/** Utilities for serialization / deserialization between Python and Java, using Pickle. */ private[python] object SerDeUtil extends Logging { /** @@ -37,12 +34,12 @@ private[python] object SerDeUtil extends Logging { * representation is serialized */ def rddToPython[K, V](rdd: RDD[(K, V)]): RDD[Array[Byte]] = { - rdd.mapPartitions{ iter => + rdd.mapPartitions { iter => val pickle = new Pickler var keyFailed = false var valueFailed = false var firstRecord = true - iter.map{ case (k, v) => + iter.map { case (k, v) => if (firstRecord) { Try { pickle.dumps(Array(k, v)) @@ -57,29 +54,32 @@ private[python] object SerDeUtil extends Logging { } (kt, vt) match { case (Failure(kf), Failure(vf)) => - log.warn(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName}; + logWarning(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName}; Error: ${kf.getMessage}""") - log.warn(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName}; + logWarning(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName}; Error: ${vf.getMessage}""") keyFailed = true valueFailed = true case (Failure(kf), _) => - log.warn(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName}; + logWarning(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName}; Error: ${kf.getMessage}""") keyFailed = true case (_, Failure(vf)) => - log.warn(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName}; + logWarning(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName}; Error: ${vf.getMessage}""") valueFailed = true } } firstRecord = false } - (keyFailed, valueFailed) match { - case (true, true) => pickle.dumps(Array(k.toString, v.toString)) - case (true, false) => pickle.dumps(Array(k.toString, v)) - case (false, true) => pickle.dumps(Array(k, v.toString)) - case (false, false) => pickle.dumps(Array(k, v)) + if (keyFailed && valueFailed) { + pickle.dumps(Array(k.toString, v.toString)) + } else if (keyFailed) { + pickle.dumps(Array(k.toString, v)) + } else if (!keyFailed && valueFailed) { + pickle.dumps(Array(k, v.toString)) + } else { + pickle.dumps(Array(k, v)) } } } diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala index b4890f4e259ae..707c01d32ed0f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -58,9 +58,15 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten * This object contains method to generate SequenceFile test data and write it to a * given directory (probably a temp directory) */ -object WriteInputFormatTestDataGenerator extends App { +object WriteInputFormatTestDataGenerator { import SparkContext._ + def main(args: Array[String]) { + val path = args(0) + val sc = new JavaSparkContext("local[4]", "test-writables") + generateData(path, sc) + } + def generateData(path: String, jsc: JavaSparkContext) { val sc = jsc.sc @@ -99,8 +105,7 @@ object WriteInputFormatTestDataGenerator extends App { sc.parallelize(data, numSlices = 2) .map{ case (k, v) => (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_)))) - } - .saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath) + }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath) // Create test data for MapWritable, with keys DoubleWritable and values Text val mapData = Seq(