diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a45adfc773449..f63cc4a55fb98 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -28,7 +28,8 @@ from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway -from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, PairDeserializer +from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ + PairDeserializer from pyspark.storagelevel import StorageLevel from pyspark import rdd from pyspark.rdd import RDD @@ -296,98 +297,6 @@ def wholeTextFiles(self, path): return RDD(self._jsc.wholeTextFiles(path), self, PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) - def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text", - key_wrapper="", value_wrapper="", minSplits=None): - """ - Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, - a local file system (available on all nodes), or any Hadoop-supported file system URI. - The mechanism is as follows: - 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes - 2. Serialization is attempted via Pyrolite pickling - 3. If this fails, the fallback is to call 'toString' on each key and value - 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfint/").collect()) - [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfdouble/").collect()) - [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sftext/").collect()) - [(u'1', u'aa'), (u'1', u'aa'), (u'2', u'aa'), (u'2', u'bb'), (u'2', u'bb'), (u'3', u'cc')] - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfbool/").collect()) - [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfnull/").collect()) - [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)] - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfmap/").collect()) - [(1, {2.0: u'aa'}), (1, {3.0: u'bb'}), (2, {1.0: u'aa'}), (2, {1.0: u'cc'}), (2, {3.0: u'bb'}), (3, {2.0: u'dd'})] - >>> r = sorted(sc.sequenceFile(tempdir + "/sftestdata/sfclass").collect())[0] - >>> r == (u'1', {u'__class__': u'org.apache.spark.api.python.TestWritable', u'double': 54.0, u'int': 123, u'str': u'test1'}) - True - """ - minSplits = minSplits or min(self.defaultParallelism, 2) - jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper, - minSplits) - return RDD(jrdd, self, PickleSerializer()) - - def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text", - value_class="org.apache.hadoop.io.Text", key_wrapper="toString", - value_wrapper="toString", conf={}): - """ - Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, - a local file system (available on all nodes), or any Hadoop-supported file system URI. - The mechanism is the same as for sc.sequenceFile. - - A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java - """ - jconf = self._jvm.java.util.HashMap() - for k, v in conf.iteritems(): - jconf[k] = v - jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class, - key_wrapper, value_wrapper, jconf) - return RDD(jrdd, self, PickleSerializer()) - - def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text", - value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): - """ - Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, - which is passed in as a Python dict. This will be converted into a Configuration in Java. - The mechanism is the same as for sc.sequenceFile. - """ - jconf = self._jvm.java.util.HashMap() - for k, v in conf.iteritems(): - jconf[k] = v - jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper, - value_wrapper, jconf) - return RDD(jrdd, self, PickleSerializer()) - - def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text", - value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): - """ - Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, - a local file system (available on all nodes), or any Hadoop-supported file system URI. - The mechanism is the same as for sc.sequenceFile. - - A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java - """ - jconf = self._jvm.java.util.HashMap() - for k, v in conf.iteritems(): - jconf[k] = v - jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper, - value_wrapper, jconf) - return RDD(jrdd, self, PickleSerializer()) - - def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text", - value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): - """ - Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, - which is passed in as a Python dict. This will be converted into a Configuration in Java. - The mechanism is the same as for sc.sequenceFile. - """ - jconf = self._jvm.java.util.HashMap() - for k, v in conf.iteritems(): - jconf[k] = v - jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper, - value_wrapper, jconf) - return RDD(jrdd, self, PickleSerializer()) - def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) return RDD(jrdd, self, input_deserializer) @@ -514,8 +423,11 @@ def _getJavaStorageLevel(self, storageLevel): raise Exception("storageLevel must be of type pyspark.StorageLevel") newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel - return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory, - storageLevel.deserialized, storageLevel.replication) + return newStorageLevel(storageLevel.useDisk, + storageLevel.useMemory, + storageLevel.useOffHeap, + storageLevel.deserialized, + storageLevel.replication) def setJobGroup(self, groupId, description): """ @@ -555,9 +467,8 @@ def _test(): globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['tempdir'] = tempfile.mkdtemp() - globs['sc']._jvm.WriteInputFormatTestDataGenerator.generateData(globs['tempdir'], globs['sc']._jsc) atexit.register(lambda: shutil.rmtree(globs['tempdir'])) - (failure_count, test_count) = doctest.testmod(globs=globs) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)