From 11e5cd7edb57017e324afb4c8637688a2acdfcd8 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 17 Jan 2017 09:53:20 -0800 Subject: [PATCH] [SPARK-19019] [PYTHON] Fix hijacked `collections.namedtuple` and port cloudpickle changes for PySpark to work with Python 3.6.0 ## What changes were proposed in this pull request? Currently, PySpark does not work with Python 3.6.0. Running `./bin/pyspark` simply throws the error as below and PySpark does not work at all: ``` Traceback (most recent call last): File ".../spark/python/pyspark/shell.py", line 30, in import pyspark File ".../spark/python/pyspark/__init__.py", line 46, in from pyspark.context import SparkContext File ".../spark/python/pyspark/context.py", line 36, in from pyspark.java_gateway import launch_gateway File ".../spark/python/pyspark/java_gateway.py", line 31, in from py4j.java_gateway import java_import, JavaGateway, GatewayClient File "", line 961, in _find_and_load File "", line 950, in _find_and_load_unlocked File "", line 646, in _load_unlocked File "", line 616, in _load_backward_compatible File ".../spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 18, in File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pydoc.py", line 62, in import pkgutil File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pkgutil.py", line 22, in ModuleInfo = namedtuple('ModuleInfo', 'module_finder name ispkg') File ".../spark/python/pyspark/serializers.py", line 394, in namedtuple cls = _old_namedtuple(*args, **kwargs) TypeError: namedtuple() missing 3 required keyword-only arguments: 'verbose', 'rename', and 'module' ``` The root cause seems because some arguments of `namedtuple` are now completely keyword-only arguments from Python 3.6.0 (See https://bugs.python.org/issue25628). We currently copy this function via `types.FunctionType` which does not set the default values of keyword-only arguments (meaning `namedtuple.__kwdefaults__`) and this seems causing internally missing values in the function (non-bound arguments). This PR proposes to work around this by manually setting it via `kwargs` as `types.FunctionType` seems not supporting to set this. Also, this PR ports the changes in cloudpickle for compatibility for Python 3.6.0. ## How was this patch tested? Manually tested with Python 2.7.6 and Python 3.6.0. ``` ./bin/pyspsark ``` , manual creation of `namedtuple` both in local and rdd with Python 3.6.0, and Jenkins tests for other Python versions. Also, ``` ./run-tests --python-executables=python3.6 ``` ``` Will test against the following Python executables: ['python3.6'] Will test the following Python modules: ['pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-sql', 'pyspark-streaming'] Finished test(python3.6): pyspark.sql.tests (192s) Finished test(python3.6): pyspark.accumulators (3s) Finished test(python3.6): pyspark.mllib.tests (198s) Finished test(python3.6): pyspark.broadcast (3s) Finished test(python3.6): pyspark.conf (2s) Finished test(python3.6): pyspark.context (14s) Finished test(python3.6): pyspark.ml.classification (21s) Finished test(python3.6): pyspark.ml.evaluation (11s) Finished test(python3.6): pyspark.ml.clustering (20s) Finished test(python3.6): pyspark.ml.linalg.__init__ (0s) Finished test(python3.6): pyspark.streaming.tests (240s) Finished test(python3.6): pyspark.tests (240s) Finished test(python3.6): pyspark.ml.recommendation (19s) Finished test(python3.6): pyspark.ml.feature (36s) Finished test(python3.6): pyspark.ml.regression (37s) Finished test(python3.6): pyspark.ml.tuning (28s) Finished test(python3.6): pyspark.mllib.classification (26s) Finished test(python3.6): pyspark.mllib.evaluation (18s) Finished test(python3.6): pyspark.mllib.clustering (44s) Finished test(python3.6): pyspark.mllib.linalg.__init__ (0s) Finished test(python3.6): pyspark.mllib.feature (26s) Finished test(python3.6): pyspark.mllib.fpm (23s) Finished test(python3.6): pyspark.mllib.random (8s) Finished test(python3.6): pyspark.ml.tests (92s) Finished test(python3.6): pyspark.mllib.stat.KernelDensity (0s) Finished test(python3.6): pyspark.mllib.linalg.distributed (25s) Finished test(python3.6): pyspark.mllib.stat._statistics (15s) Finished test(python3.6): pyspark.mllib.recommendation (24s) Finished test(python3.6): pyspark.mllib.regression (26s) Finished test(python3.6): pyspark.profiler (9s) Finished test(python3.6): pyspark.mllib.tree (16s) Finished test(python3.6): pyspark.shuffle (1s) Finished test(python3.6): pyspark.mllib.util (18s) Finished test(python3.6): pyspark.serializers (11s) Finished test(python3.6): pyspark.rdd (20s) Finished test(python3.6): pyspark.sql.conf (8s) Finished test(python3.6): pyspark.sql.catalog (17s) Finished test(python3.6): pyspark.sql.column (18s) Finished test(python3.6): pyspark.sql.context (18s) Finished test(python3.6): pyspark.sql.group (27s) Finished test(python3.6): pyspark.sql.dataframe (33s) Finished test(python3.6): pyspark.sql.functions (35s) Finished test(python3.6): pyspark.sql.types (6s) Finished test(python3.6): pyspark.sql.streaming (13s) Finished test(python3.6): pyspark.streaming.util (0s) Finished test(python3.6): pyspark.sql.session (16s) Finished test(python3.6): pyspark.sql.window (4s) Finished test(python3.6): pyspark.sql.readwriter (35s) Tests passed in 433 seconds ``` Author: hyukjinkwon Closes #16429 from HyukjinKwon/SPARK-19019. --- python/pyspark/cloudpickle.py | 98 ++++++++++++++++++++++++----------- python/pyspark/serializers.py | 20 +++++++ 2 files changed, 87 insertions(+), 31 deletions(-) diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index e56e22a9b920e..98895956a2dcb 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -43,6 +43,7 @@ from __future__ import print_function import operator +import opcode import os import io import pickle @@ -53,6 +54,8 @@ import itertools import dis import traceback +import weakref + if sys.version < '3': from pickle import Pickler @@ -68,10 +71,10 @@ PY3 = True #relevant opcodes -STORE_GLOBAL = dis.opname.index('STORE_GLOBAL') -DELETE_GLOBAL = dis.opname.index('DELETE_GLOBAL') -LOAD_GLOBAL = dis.opname.index('LOAD_GLOBAL') -GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL] +STORE_GLOBAL = opcode.opmap['STORE_GLOBAL'] +DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL'] +LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL'] +GLOBAL_OPS = (STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL) HAVE_ARGUMENT = dis.HAVE_ARGUMENT EXTENDED_ARG = dis.EXTENDED_ARG @@ -90,6 +93,43 @@ def _builtin_type(name): return getattr(types, name) +if sys.version_info < (3, 4): + def _walk_global_ops(code): + """ + Yield (opcode, argument number) tuples for all + global-referencing instructions in *code*. + """ + code = getattr(code, 'co_code', b'') + if not PY3: + code = map(ord, code) + + n = len(code) + i = 0 + extended_arg = 0 + while i < n: + op = code[i] + i += 1 + if op >= HAVE_ARGUMENT: + oparg = code[i] + code[i + 1] * 256 + extended_arg + extended_arg = 0 + i += 2 + if op == EXTENDED_ARG: + extended_arg = oparg * 65536 + if op in GLOBAL_OPS: + yield op, oparg + +else: + def _walk_global_ops(code): + """ + Yield (opcode, argument number) tuples for all + global-referencing instructions in *code*. + """ + for instr in dis.get_instructions(code): + op = instr.opcode + if op in GLOBAL_OPS: + yield op, instr.arg + + class CloudPickler(Pickler): dispatch = Pickler.dispatch.copy() @@ -245,38 +285,34 @@ def save_function_tuple(self, func): write(pickle.TUPLE) write(pickle.REDUCE) # applies _fill_function on the tuple - @staticmethod - def extract_code_globals(co): + _extract_code_globals_cache = ( + weakref.WeakKeyDictionary() + if sys.version_info >= (2, 7) and not hasattr(sys, "pypy_version_info") + else {}) + + @classmethod + def extract_code_globals(cls, co): """ Find all globals names read or written to by codeblock co """ - code = co.co_code - if not PY3: - code = [ord(c) for c in code] - names = co.co_names - out_names = set() - - n = len(code) - i = 0 - extended_arg = 0 - while i < n: - op = code[i] + out_names = cls._extract_code_globals_cache.get(co) + if out_names is None: + try: + names = co.co_names + except AttributeError: + # PyPy "builtin-code" object + out_names = set() + else: + out_names = set(names[oparg] + for op, oparg in _walk_global_ops(co)) - i += 1 - if op >= HAVE_ARGUMENT: - oparg = code[i] + code[i+1] * 256 + extended_arg - extended_arg = 0 - i += 2 - if op == EXTENDED_ARG: - extended_arg = oparg*65536 - if op in GLOBAL_OPS: - out_names.add(names[oparg]) + # see if nested function have any global refs + if co.co_consts: + for const in co.co_consts: + if type(const) is types.CodeType: + out_names |= cls.extract_code_globals(const) - # see if nested function have any global refs - if co.co_consts: - for const in co.co_consts: - if type(const) is types.CodeType: - out_names |= CloudPickler.extract_code_globals(const) + cls._extract_code_globals_cache[co] = out_names return out_names diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 2a1326947f4f5..a9e14b8f7033b 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -370,18 +370,38 @@ def _hijack_namedtuple(): return global _old_namedtuple # or it will put in closure + global _old_namedtuple_kwdefaults # or it will put in closure too def _copy_func(f): return types.FunctionType(f.__code__, f.__globals__, f.__name__, f.__defaults__, f.__closure__) + def _kwdefaults(f): + # __kwdefaults__ contains the default values of keyword-only arguments which are + # introduced from Python 3. The possible cases for __kwdefaults__ in namedtuple + # are as below: + # + # - Does not exist in Python 2. + # - Returns None in <= Python 3.5.x. + # - Returns a dictionary containing the default values to the keys from Python 3.6.x + # (See https://bugs.python.org/issue25628). + kargs = getattr(f, "__kwdefaults__", None) + if kargs is None: + return {} + else: + return kargs + _old_namedtuple = _copy_func(collections.namedtuple) + _old_namedtuple_kwdefaults = _kwdefaults(collections.namedtuple) def namedtuple(*args, **kwargs): + for k, v in _old_namedtuple_kwdefaults.items(): + kwargs[k] = kwargs.get(k, v) cls = _old_namedtuple(*args, **kwargs) return _hack_namedtuple(cls) # replace namedtuple with new one + collections.namedtuple.__globals__["_old_namedtuple_kwdefaults"] = _old_namedtuple_kwdefaults collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple collections.namedtuple.__code__ = namedtuple.__code__