Skip to content

Commit

Permalink
Recover earlier changes lost in previous merge for context.py
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Apr 23, 2014
1 parent 5af4770 commit 077ecb2
Showing 1 changed file with 8 additions and 97 deletions.
105 changes: 8 additions & 97 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, PairDeserializer
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
Expand Down Expand Up @@ -296,98 +297,6 @@ def wholeTextFiles(self, path):
return RDD(self._jsc.wholeTextFiles(path), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text",
key_wrapper="", value_wrapper="", minSplits=None):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes
2. Serialization is attempted via Pyrolite pickling
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{PickleSerializer} is used to deserialize pickled objects on the Python side
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfint/").collect())
[(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfdouble/").collect())
[(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sftext/").collect())
[(u'1', u'aa'), (u'1', u'aa'), (u'2', u'aa'), (u'2', u'bb'), (u'2', u'bb'), (u'3', u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfbool/").collect())
[(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfnull/").collect())
[(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfmap/").collect())
[(1, {2.0: u'aa'}), (1, {3.0: u'bb'}), (2, {1.0: u'aa'}), (2, {1.0: u'cc'}), (2, {3.0: u'bb'}), (3, {2.0: u'dd'})]
>>> r = sorted(sc.sequenceFile(tempdir + "/sftestdata/sfclass").collect())[0]
>>> r == (u'1', {u'__class__': u'org.apache.spark.api.python.TestWritable', u'double': 54.0, u'int': 123, u'str': u'test1'})
True
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper,
minSplits)
return RDD(jrdd, self, PickleSerializer())

def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class,
key_wrapper, value_wrapper, jconf)
return RDD(jrdd, self, PickleSerializer())

def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, PickleSerializer())

def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, PickleSerializer())

def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, PickleSerializer())

def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
return RDD(jrdd, self, input_deserializer)
Expand Down Expand Up @@ -514,8 +423,11 @@ def _getJavaStorageLevel(self, storageLevel):
raise Exception("storageLevel must be of type pyspark.StorageLevel")

newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory,
storageLevel.deserialized, storageLevel.replication)
return newStorageLevel(storageLevel.useDisk,
storageLevel.useMemory,
storageLevel.useOffHeap,
storageLevel.deserialized,
storageLevel.replication)

def setJobGroup(self, groupId, description):
"""
Expand Down Expand Up @@ -555,9 +467,8 @@ def _test():
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['tempdir'] = tempfile.mkdtemp()
globs['sc']._jvm.WriteInputFormatTestDataGenerator.generateData(globs['tempdir'], globs['sc']._jsc)
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
(failure_count, test_count) = doctest.testmod(globs=globs)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
Expand Down

0 comments on commit 077ecb2

Please sign in to comment.