Skip to content

Commit

Permalink
Fix test failures due to ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Apr 23, 2014
1 parent 78978d9 commit e001b94
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,20 +306,21 @@ def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class=
2. Serialization is attempted via Pyrolite pickling
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{PickleSerializer} is used to deserialize pickled objects on the Python side
>>> sc.sequenceFile(tempdir + "/sftestdata/sfint/").collect()
[(1, u'aa'), (2, u'bb'), (2, u'aa'), (3, u'cc'), (2, u'bb'), (1, u'aa')]
>>> sc.sequenceFile(tempdir + "/sftestdata/sfdouble/").collect()
[(1.0, u'aa'), (2.0, u'bb'), (2.0, u'aa'), (3.0, u'cc'), (2.0, u'bb'), (1.0, u'aa')]
>>> sc.sequenceFile(tempdir + "/sftestdata/sftext/").collect()
[(u'1', u'aa'), (u'2', u'bb'), (u'2', u'aa'), (u'3', u'cc'), (u'2', u'bb'), (u'1', u'aa')]
>>> sc.sequenceFile(tempdir + "/sftestdata/sfbool/").collect()
[(1, True), (2, True), (2, False), (3, True), (2, False), (1, False)]
>>> sc.sequenceFile(tempdir + "/sftestdata/sfnull/").collect()
[(1, None), (2, None), (2, None), (3, None), (2, None), (1, None)]
>>> sc.sequenceFile(tempdir + "/sftestdata/sfmap/").collect()
[(1, {2.0: u'aa'}), (2, {3.0: u'bb'}), (2, {1.0: u'cc'}), (3, {2.0: u'dd'}), (2, {1.0: u'aa'}), (1, {3.0: u'bb'})]
>>> sc.sequenceFile(tempdir + "/sftestdata/sfclass").first()
(u'1', {u'int': 123, u'double': 54.0, u'__class__': u'org.apache.spark.api.python.TestWritable', u'str': u'test1'})
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfint/").collect())
[(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfdouble/").collect())
[(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sftext/").collect())
[(u'1', u'aa'), (u'1', u'aa'), (u'2', u'aa'), (u'2', u'bb'), (u'2', u'bb'), (u'3', u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfbool/").collect())
[(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfnull/").collect())
[(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfmap/").collect())
[(1, {2.0: u'aa'}), (1, {3.0: u'bb'}), (2, {1.0: u'aa'}), (2, {1.0: u'cc'}), (2, {3.0: u'bb'}), (3, {2.0: u'dd'})]
>>> r = sc.sequenceFile(tempdir + "/sftestdata/sfclass").first()
>>> [r[0], sorted([(k, v) for k, v in r[1].iteritems()])]
[u'1', [(u'__class__', u'org.apache.spark.api.python.TestWritable'), (u'double', 54.0), (u'int', 123), (u'str', u'test1')]]
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper,
Expand Down

0 comments on commit e001b94

Please sign in to comment.