Skip to content

Commit

Permalink
[SPARK-791] [PySpark] fix pickle itemgetter with cloudpickle
Browse files Browse the repository at this point in the history
fix the problem with pickle operator.itemgetter with multiple index.

Author: Davies Liu <[email protected]>

Closes #1627 from davies/itemgetter and squashes the following commits:

aabd7fa [Davies Liu] fix pickle itemgetter with cloudpickle

(cherry picked from commit 92ef026)

Conflicts:
	python/pyspark/tests.py
  • Loading branch information
davies authored and JoshRosen committed Jul 29, 2014
1 parent 7e4a0e1 commit b060014
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
5 changes: 3 additions & 2 deletions python/pyspark/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,9 @@ class ItemGetterType(ctypes.Structure):
]


itemgetter_obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents
return self.save_reduce(operator.itemgetter, (itemgetter_obj.item,))
obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents
return self.save_reduce(operator.itemgetter,
obj.item if obj.nitems > 1 else (obj.item,))

if PyObject_HEAD:
dispatch[operator.itemgetter] = save_itemgetter
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ def test_deleting_input_files(self):
os.unlink(tempFile.name)
self.assertRaises(Exception, lambda: filtered_data.count())

def test_itemgetter(self):
rdd = self.sc.parallelize([range(10)])
from operator import itemgetter
self.assertEqual([1], rdd.map(itemgetter(1)).collect())
self.assertEqual([(2, 3)], rdd.map(itemgetter(2, 3)).collect())


class TestIO(PySparkTestCase):

Expand Down

0 comments on commit b060014

Please sign in to comment.