From d7ef6ce57c4878d4e04ed132fff74df3956447a4 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 10 Sep 2015 12:50:01 -0700 Subject: [PATCH 1/4] fix serialize namedtuple --- python/pyspark/cloudpickle.py | 7 ++++++- python/pyspark/tests.py | 5 +++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 3b647985801b7..5c28d75b28e79 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -350,6 +350,11 @@ def save_global(self, obj, name=None, pack=struct.pack): if new_override: d['__new__'] = obj.__new__ + if '__dict__' in d: + # '__dict__' is not writable + self.save_reduce(typ, (obj.__name__, obj.__bases__, d), obj=obj) + return + self.save(_load_class) self.save_reduce(typ, (obj.__name__, obj.__bases__, {"__doc__": obj.__doc__}), obj=obj) d.pop('__doc__', None) @@ -382,7 +387,7 @@ def save_instancemethod(self, obj): self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj) else: self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__), - obj=obj) + obj=obj) dispatch[types.MethodType] = save_instancemethod def save_inst(self, obj): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 8bfed074c9052..93d42bf80f9ae 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -218,6 +218,11 @@ def test_namedtuple(self): p2 = loads(dumps(p1, 2)) self.assertEqual(p1, p2) + import cloudpickle + P2 = loads(cloudpickle.dumps(P)) + p3 = P2(1, 3) + self.assertEqual(p1, p3) + def test_itemgetter(self): from operator import itemgetter ser = CloudPickleSerializer() From 1d766aa050f5824747f61567292067f123ed019c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 10 Sep 2015 13:44:42 -0700 Subject: [PATCH 2/4] use special case for namedtuple --- python/pyspark/cloudpickle.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 5c28d75b28e79..0623051aec8da 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -350,8 +350,8 @@ def save_global(self, obj, name=None, pack=struct.pack): if new_override: d['__new__'] = obj.__new__ - if '__dict__' in d: - # '__dict__' is not writable + # workaround for namedtuple + if '__dict__' in d or '__new__' in d: self.save_reduce(typ, (obj.__name__, obj.__bases__, d), obj=obj) return From a2f9f36d87fb3bf31020bf07aa00bd8c86077c4a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 10 Sep 2015 14:32:10 -0700 Subject: [PATCH 3/4] fix test in python 3.4 --- python/pyspark/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 93d42bf80f9ae..647504c32f156 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -218,8 +218,8 @@ def test_namedtuple(self): p2 = loads(dumps(p1, 2)) self.assertEqual(p1, p2) - import cloudpickle - P2 = loads(cloudpickle.dumps(P)) + from pyspark.cloudpickle import dumps + P2 = loads(dumps(P)) p3 = P2(1, 3) self.assertEqual(p1, p3) From 93266970c7e890e54b0842a108d99612ff53443f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 10 Sep 2015 16:57:17 -0700 Subject: [PATCH 4/4] fix it in PyPy --- python/pyspark/cloudpickle.py | 14 +++++++++++--- python/pyspark/serializers.py | 1 + 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 0623051aec8da..95b3abc74244b 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -350,9 +350,9 @@ def save_global(self, obj, name=None, pack=struct.pack): if new_override: d['__new__'] = obj.__new__ - # workaround for namedtuple - if '__dict__' in d or '__new__' in d: - self.save_reduce(typ, (obj.__name__, obj.__bases__, d), obj=obj) + # workaround for namedtuple (hijacked by PySpark) + if getattr(obj, '_is_namedtuple_', False): + self.save_reduce(_load_namedtuple, (obj.__name__, obj._fields)) return self.save(_load_class) @@ -749,6 +749,14 @@ def _load_class(cls, d): return cls +def _load_namedtuple(name, fields): + """ + Loads a class generated by namedtuple + """ + from collections import namedtuple + return namedtuple(name, fields) + + """Constructors for 3rd party libraries Note: These can never be renamed due to client compatibility issues""" diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 411b4dbf481f1..2a1326947f4f5 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -359,6 +359,7 @@ def _hack_namedtuple(cls): def __reduce__(self): return (_restore, (name, fields, tuple(self))) cls.__reduce__ = __reduce__ + cls._is_namedtuple_ = True return cls