Skip to content

Commit

Permalink
[SPARK-2024] code formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
kanzhang committed Jul 30, 2014
1 parent 6591e37 commit c01e3ef
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
27 changes: 13 additions & 14 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,25 @@ private[python] object SerDeUtil extends Logging {
}
pyRDD.mapPartitions { iter =>
val unpickle = new Unpickler
val unpickled = if (batchSerialized) {
iter.flatMap { batch =>
unpickle.loads(batch) match {
case objs: java.util.List[_] => collectionAsScalaIterable(objs)
case other => throw new SparkException(
s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
val unpickled =
if (batchSerialized) {
iter.flatMap { batch =>
unpickle.loads(batch) match {
case objs: java.util.List[_] => collectionAsScalaIterable(objs)
case other => throw new SparkException(
s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
}
}
} else {
iter.map(unpickle.loads(_))
}
} else {
iter.map(unpickle.loads(_))
}
unpickled.map {
// we only accept pickled (K, V)
case obj if isPair(obj) =>
// we only accept (K, V)
val arr = obj.asInstanceOf[Array[_]]
// arr has only 2 elements K and V
(arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
case other =>
throw new SparkException(
s"RDD element of type ${other.getClass.getName} cannot be used")
case other => throw new SparkException(
s"RDD element of type ${other.getClass.getName} cannot be used")
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueCl
pickledRDD = self._toPickleSerialization()
batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)
outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)

def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
"""
Expand All @@ -1099,7 +1099,8 @@ def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
keyConverter, valueConverter, False)

def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None):
keyConverter=None, valueConverter=None, conf=None,
compressionCodecClass=None):
"""
Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
system, using the old Hadoop OutputFormat API (mapred package). Key and value types
Expand All @@ -1123,8 +1124,8 @@ def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=No
jconf = self.ctx._dictToJavaMap(conf)
pickledRDD = self._toPickleSerialization()
batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched,
path, outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
jconf, compressionCodecClass)

def saveAsSequenceFile(self, path, compressionCodecClass=None):
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ def test_unbatched_save_and_read(self):

def test_malformed_RDD(self):
basepath = self.tempdir.name
# non-batch-serialized RDD of type RDD[[(K, V)]] should be rejected
# non-batch-serialized RDD[[(K, V)]] should be rejected
data = [[(1, "a")], [(2, "aa")], [(3, "aaa")]]
rdd = self.sc.parallelize(data, numSlices=len(data))
self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
Expand Down

0 comments on commit c01e3ef

Please sign in to comment.