Skip to content

Commit

Permalink
Address @mateiz style comments
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Jun 3, 2014
1 parent 01e0813 commit 1a4a1d6
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._


/**
* Utilities for working with Python objects -> Hadoop-related objects
*/
/** Utilities for working with Python objects -> Hadoop-related objects */
private[python] object PythonHadoopUtil {

/**
* Convert a Map of properties to a [[org.apache.hadoop.conf.Configuration]]
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
*/
def mapToConf(map: java.util.Map[String, String]) = {
def mapToConf(map: java.util.Map[String, String]): Configuration = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
Expand All @@ -42,7 +40,7 @@ private[python] object PythonHadoopUtil {
* Merges two configurations, returns a copy of left with keys from right overwriting
* any matching keys in left
*/
def mergeConfs(left: Configuration, right: Configuration) = {
def mergeConfs(left: Configuration, right: Configuration): Configuration = {
import collection.JavaConversions._
val copy = new Configuration(left)
right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))
Expand Down
32 changes: 16 additions & 16 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ import scala.util.Success
import scala.util.Failure
import net.razorvine.pickle.Pickler

/**
* Utilities for serialization / deserialization between Python and Java, using MsgPack.
* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] ->
* Scala objects and primitives
*/

/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[python] object SerDeUtil extends Logging {

/**
Expand All @@ -37,12 +34,12 @@ private[python] object SerDeUtil extends Logging {
* representation is serialized
*/
def rddToPython[K, V](rdd: RDD[(K, V)]): RDD[Array[Byte]] = {
rdd.mapPartitions{ iter =>
rdd.mapPartitions { iter =>
val pickle = new Pickler
var keyFailed = false
var valueFailed = false
var firstRecord = true
iter.map{ case (k, v) =>
iter.map { case (k, v) =>
if (firstRecord) {
Try {
pickle.dumps(Array(k, v))
Expand All @@ -57,29 +54,32 @@ private[python] object SerDeUtil extends Logging {
}
(kt, vt) match {
case (Failure(kf), Failure(vf)) =>
log.warn(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName};
logWarning(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName};
Error: ${kf.getMessage}""")
log.warn(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName};
logWarning(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName};
Error: ${vf.getMessage}""")
keyFailed = true
valueFailed = true
case (Failure(kf), _) =>
log.warn(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName};
logWarning(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName};
Error: ${kf.getMessage}""")
keyFailed = true
case (_, Failure(vf)) =>
log.warn(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName};
logWarning(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName};
Error: ${vf.getMessage}""")
valueFailed = true
}
}
firstRecord = false
}
(keyFailed, valueFailed) match {
case (true, true) => pickle.dumps(Array(k.toString, v.toString))
case (true, false) => pickle.dumps(Array(k.toString, v))
case (false, true) => pickle.dumps(Array(k, v.toString))
case (false, false) => pickle.dumps(Array(k, v))
if (keyFailed && valueFailed) {
pickle.dumps(Array(k.toString, v.toString))
} else if (keyFailed) {
pickle.dumps(Array(k.toString, v))
} else if (!keyFailed && valueFailed) {
pickle.dumps(Array(k, v.toString))
} else {
pickle.dumps(Array(k, v))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,15 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
* This object contains method to generate SequenceFile test data and write it to a
* given directory (probably a temp directory)
*/
object WriteInputFormatTestDataGenerator extends App {
object WriteInputFormatTestDataGenerator {
import SparkContext._

def main(args: Array[String]) {
val path = args(0)
val sc = new JavaSparkContext("local[4]", "test-writables")
generateData(path, sc)
}

def generateData(path: String, jsc: JavaSparkContext) {
val sc = jsc.sc

Expand Down Expand Up @@ -99,8 +105,7 @@ object WriteInputFormatTestDataGenerator extends App {
sc.parallelize(data, numSlices = 2)
.map{ case (k, v) =>
(new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
}
.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
}.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)

// Create test data for MapWritable, with keys DoubleWritable and values Text
val mapData = Seq(
Expand Down

0 comments on commit 1a4a1d6

Please sign in to comment.