Skip to content

Commit

Permalink
Scalastyle fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Apr 21, 2014
1 parent fc5099e commit 31a2fff
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ private[python] object PythonHadoopUtil {
conf
}

/** Merges two configurations, returns a copy of left with keys from right overwriting any matching keys in left */
/**
* Merges two configurations, returns a copy of left with keys from right overwriting
* any matching keys in left
*/
def mergeConfs(left: Configuration, right: Configuration) = {
import collection.JavaConversions._
val copy = new Configuration(left)
Expand Down
102 changes: 54 additions & 48 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,6 @@ private[spark] object PythonRDD {
}
}

// PySpark / Hadoop InputFormat//

/** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */
def sequenceFile[K, V](sc: JavaSparkContext,
path: String,
Expand All @@ -295,14 +293,15 @@ private[spark] object PythonRDD {
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
* key and value class
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
Expand All @@ -314,16 +313,18 @@ private[spark] object PythonRDD {
}

/**
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python,
* using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]], key and value class
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is
* passed in from Python, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
* key and value class
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Expand All @@ -332,12 +333,13 @@ private[spark] object PythonRDD {
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
Expand All @@ -356,14 +358,15 @@ private[spark] object PythonRDD {
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
* key and value class
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
def hadoopFile[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
Expand All @@ -375,16 +378,18 @@ private[spark] object PythonRDD {
}

/**
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python,
* using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], key and value class
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map
* that is passed in from Python, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
* key and value class
*/
def hadoopRDD[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
def hadoopRDD[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Expand All @@ -393,12 +398,13 @@ private[spark] object PythonRDD {
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
Expand Down
18 changes: 11 additions & 7 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import scala.util.Failure

/**
* Utilities for serialization / deserialization between Python and Java, using MsgPack.
* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] -> Scala objects and primitives
* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] ->
* Scala objects and primitives
*/
private[python] object SerDeUtil extends Logging {

/**
* Checks whether a Scala object needs to be registered with MsgPack. String, primitives and the standard collections
* don't need to be registered as MsgPack takes care of serializing them and registering them throws scary looking
* errors (but still works).
* Checks whether a Scala object needs to be registered with MsgPack. String, primitives
* and the standard collections don't need to be registered as MsgPack takes care of serializing
* them and registering them throws scary looking errors (but still works).
*/
def needsToBeRegistered[T](t: T) = {
t match {
Expand Down Expand Up @@ -101,8 +102,8 @@ private[python] object SerDeUtil extends Logging {
}

/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of [[org.apache.hadoop.io.Writable]],
* into an RDD[(K, V)]
* Converts an RDD of key-value pairs, where key and/or value could be instances of
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
*/
def convertRDD[K, V](rdd: RDD[(K, V)]) = {
rdd.map{
Expand All @@ -113,7 +114,9 @@ private[python] object SerDeUtil extends Logging {
}
}

/** Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or object representation */
/** Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
* object representation
*/
def convert(writable: Writable): Any = {
import collection.JavaConversions._
writable match {
Expand All @@ -132,3 +135,4 @@ private[python] object SerDeUtil extends Logging {
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.SparkContext
import org.apache.hadoop.io._
import scala.Array
import java.io.{DataOutput, DataInput}
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat

/**
* A class to test MsgPack serialization on the Scala side, that will be deserialized
Expand Down Expand Up @@ -66,15 +67,20 @@ object WriteInputFormatTestDataGenerator extends App {
val boolPath = s"$basePath/sfbool"
val nullPath = s"$basePath/sfnull"

// Create test data for IntWritable, DoubleWritable, Text, BytesWritable, BooleanWritable and NullWritable
/*
* Create test data for IntWritable, DoubleWritable, Text, BytesWritable,
* BooleanWritable and NullWritable
*/
val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa"))
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
sc.parallelize(bools).saveAsSequenceFile(boolPath)
sc.parallelize(intKeys).map{ case (k, v) => (new IntWritable(k), NullWritable.get()) }.saveAsSequenceFile(nullPath)
sc.parallelize(intKeys).map{ case (k, v) =>
(new IntWritable(k), NullWritable.get())
}.saveAsSequenceFile(nullPath)

// Create test data for ArrayWritable
val data = Seq(
Expand All @@ -86,7 +92,7 @@ object WriteInputFormatTestDataGenerator extends App {
.map{ case (k, v) =>
(new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
}
.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)

// Create test data for MapWritable, with keys DoubleWritable and values Text
val mapData = Seq(
Expand Down Expand Up @@ -116,6 +122,6 @@ object WriteInputFormatTestDataGenerator extends App {
val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
rdd.saveAsNewAPIHadoopFile(classPath,
classOf[Text], classOf[TestWritable],
classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestWritable]])
classOf[SequenceFileOutputFormat[Text, TestWritable]])

}
}

0 comments on commit 31a2fff

Please sign in to comment.