From 0bdec555e632020d42a28dbe7ec1d2083da8c9fa Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Mon, 28 Jul 2014 11:54:59 -0700 Subject: [PATCH] [SPARK-2024] Refactoring newly added tests --- python/pyspark/tests.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index eac501f9f6b37..25683013ab5f7 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -541,7 +541,12 @@ def test_oldhadoop(self): "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable", "mapred.output.dir" : basepath + "/olddataset/"} self.sc.parallelize(dict_data).saveAsHadoopDataset(conf) - old_dataset = sorted(self.sc.sequenceFile(basepath + "/olddataset/").collect()) + input_conf = {"mapred.input.dir" : basepath + "/olddataset/"} + old_dataset = sorted(self.sc.hadoopRDD( + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable", + conf=input_conf).collect()) self.assertEqual(old_dataset, dict_data) def test_newhadoop(self): @@ -571,10 +576,13 @@ def test_newhadoop(self): "mapred.output.dir" : basepath + "/newdataset/"} self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf, valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter") - new_dataset = sorted(self.sc.sequenceFile( - basepath + "/newdataset/", - valueClass="org.apache.spark.api.python.DoubleArrayWritable", - valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect()) + input_conf = {"mapred.input.dir" : basepath + "/newdataset/"} + new_dataset = sorted(self.sc.newAPIHadoopRDD( + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.spark.api.python.DoubleArrayWritable", + valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter", + conf=input_conf).collect()) self.assertEqual(new_dataset, array_data) def test_newolderror(self):