Skip to content

Commit

Permalink
[SPARK-38563][PYTHON] Upgrade to Py4J 0.10.9.5
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR is a retry of apache#35871 with bumping up the version to 0.10.9.5.
It was reverted because of Python 3.10 is broken, and Python 3.10 was not officially supported in Py4J.

In Py4J 0.10.9.5, the issue was fixed (py4j/py4j#475), and it added Python 3.10 support officially with CI set up (py4j/py4j#477).

### Why are the changes needed?

See apache#35871

### Does this PR introduce _any_ user-facing change?

See apache#35871

### How was this patch tested?

Py4J sets up Python 3.10 CI now, and I manually tested PySpark with Python 3.10 with this patch:

```bash
./bin/pyspark
```

```
import py4j
py4j.__version__
spark.range(10).show()
```

```
Using Python version 3.10.0 (default, Mar  3 2022 03:57:21)
Spark context Web UI available at http://172.30.5.50:4040
Spark context available as 'sc' (master = local[*], app id = local-1647571387534).
SparkSession available as 'spark'.
>>> import py4j
>>> py4j.__version__
'0.10.9.5'
>>> spark.range(10).show()
+---+
| id|
+---+
...
```

Closes apache#35907 from HyukjinKwon/SPARK-38563-followup.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Mar 18, 2022
1 parent f36a5fb commit 97335ea
Show file tree
Hide file tree
Showing 16 changed files with 20 additions and 45 deletions.
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export PYSPARK_DRIVER_PYTHON_OPTS

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.5-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.3-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.5-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.9.3</version>
<version>0.10.9.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}

private[spark] object PythonUtils {
val PY4J_ZIP_NAME = "py4j-0.10.9.3-src.zip"
val PY4J_ZIP_NAME = "py4j-0.10.9.5-src.zip"

/** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
def sparkPythonPath: String = {
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar
parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar
pickle/1.2//pickle-1.2.jar
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
py4j/0.10.9.3//py4j-0.10.9.3.jar
py4j/0.10.9.5//py4j-0.10.9.5.jar
remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar
scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar
parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar
pickle/1.2//pickle-1.2.jar
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
py4j/0.10.9.3//py4j-0.10.9.3.jar
py4j/0.10.9.5//py4j-0.10.9.5.jar
remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar
scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar
Expand Down
2 changes: 1 addition & 1 deletion docs/job-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,5 +304,5 @@ via `sc.setJobGroup` in a separate PVM thread, which also disallows to cancel th
later.

`pyspark.InheritableThread` is recommended to use together for a PVM thread to inherit the inheritable attributes
such as local properties in a JVM thread, and to avoid resource leak.
such as local properties in a JVM thread.

2 changes: 1 addition & 1 deletion python/docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SPHINXBUILD ?= sphinx-build
SOURCEDIR ?= source
BUILDDIR ?= build

export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.3-src.zip)
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.5-src.zip)

# Put it first so that "make" without argument is like "make help".
help:
Expand Down
2 changes: 1 addition & 1 deletion python/docs/make2.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ if "%SPHINXBUILD%" == "" (
set SOURCEDIR=source
set BUILDDIR=build

set PYTHONPATH=..;..\lib\py4j-0.10.9.3-src.zip
set PYTHONPATH=..;..\lib\py4j-0.10.9.5-src.zip

if "%1" == "" goto help

Expand Down
2 changes: 1 addition & 1 deletion python/docs/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Package Minimum supported version Note
`pandas` 1.0.5 Optional for Spark SQL
`NumPy` 1.7 Required for MLlib DataFrame-based API
`pyarrow` 1.0.0 Optional for Spark SQL
`Py4J` 0.10.9.3 Required
`Py4J` 0.10.9.5 Required
`pandas` 1.0.5 Required for pandas API on Spark
`pyarrow` 1.0.0 Required for pandas API on Spark
`Numpy` 1.14 Required for pandas API on Spark
Expand Down
Binary file removed python/lib/py4j-0.10.9.3-src.zip
Binary file not shown.
Binary file added python/lib/py4j-0.10.9.5-src.zip
Binary file not shown.
6 changes: 3 additions & 3 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ def setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool =
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance, and preventing resource leak.
local inheritance.
Examples
--------
Expand Down Expand Up @@ -1405,7 +1405,7 @@ def setLocalProperty(self, key: str, value: str) -> None:
Notes
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance, and preventing resource leak.
local inheritance.
"""
self._jsc.setLocalProperty(key, value)

Expand All @@ -1423,7 +1423,7 @@ def setJobDescription(self, value: str) -> None:
Notes
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance, and preventing resource leak.
local inheritance.
"""
self._jsc.setJobDescription(value)

Expand Down
35 changes: 5 additions & 30 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,10 @@ def inheritable_thread_target(f: Callable) -> Callable:

@functools.wraps(f)
def wrapped(*args: Any, **kwargs: Any) -> Any:
try:
# Set local properties in child thread.
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
return f(*args, **kwargs)
finally:
InheritableThread._clean_py4j_conn_for_current_thread()
# Set local properties in child thread.
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
return f(*args, **kwargs)

return wrapped
else:
Expand Down Expand Up @@ -377,10 +374,7 @@ def copy_local_properties(*a: Any, **k: Any) -> Any:
assert hasattr(self, "_props")
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(self._props)
try:
return target(*a, **k)
finally:
InheritableThread._clean_py4j_conn_for_current_thread()
return target(*a, **k)

super(InheritableThread, self).__init__(
target=copy_local_properties, *args, **kwargs # type: ignore[misc]
Expand All @@ -401,25 +395,6 @@ def start(self) -> None:
self._props = SparkContext._active_spark_context._jsc.sc().getLocalProperties().clone()
return super(InheritableThread, self).start()

@staticmethod
def _clean_py4j_conn_for_current_thread() -> None:
from pyspark import SparkContext

jvm = SparkContext._jvm
assert jvm is not None
thread_connection = jvm._gateway_client.get_thread_connection()
if thread_connection is not None:
try:
# Dequeue is shared across other threads but it's thread-safe.
# If this function has to be invoked one more time in the same thead
# Py4J will create a new connection automatically.
jvm._gateway_client.deque.remove(thread_connection)
except ValueError:
# Should never reach this point
return
finally:
thread_connection.close()


if __name__ == "__main__":
if "pypy" not in platform.python_implementation().lower() and sys.version_info[:2] >= (3, 7):
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def run(self):
license='http://www.apache.org/licenses/LICENSE-2.0',
# Don't forget to update python/docs/source/getting_started/install.rst
# if you're updating the versions or dependencies.
install_requires=['py4j==0.10.9.3'],
install_requires=['py4j==0.10.9.5'],
extras_require={
'ml': ['numpy>=1.15'],
'mllib': ['numpy>=1.15'],
Expand Down
2 changes: 1 addition & 1 deletion sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.5-src.zip:${PYTHONPATH}"
export PYSPARK_PYTHONPATH_SET=1
fi

0 comments on commit 97335ea

Please sign in to comment.