Skip to content

Commit

Permalink
[SPARK-38563][PYTHON] Upgrade to Py4J 0.10.9.4
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR upgrade Py4J 0.10.9.4, with relevant documentation changes.

### Why are the changes needed?

Py4J 0.10.9.3 has a resource leak issue when pinned thread mode is enabled - it's enabled by default in PySpark at 41af409.
We worked around this by enforcing users to use `InheritableThread` or `inhteritable_thread_target` as a workaround.
After upgrading, we don't need to enforce users anymore because it automatically cleans up, see also py4j/py4j#471

### Does this PR introduce _any_ user-facing change?

Yes, users don't have to use `InheritableThread` or `inhteritable_thread_target` to avoid resource leaking problem anymore.

### How was this patch tested?

CI in this PR should test it out.

Closes #35871 from HyukjinKwon/SPARK-38563.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 8193b40)
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Mar 16, 2022
1 parent f84018a commit 3bbf346
Show file tree
Hide file tree
Showing 16 changed files with 20 additions and 45 deletions.
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export PYSPARK_DRIVER_PYTHON_OPTS

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.4-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.3-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.4-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.9.3</version>
<version>0.10.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}

private[spark] object PythonUtils {
val PY4J_ZIP_NAME = "py4j-0.10.9.3-src.zip"
val PY4J_ZIP_NAME = "py4j-0.10.9.4-src.zip"

/** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
def sparkPythonPath: String = {
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar
parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar
pickle/1.2//pickle-1.2.jar
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
py4j/0.10.9.3//py4j-0.10.9.3.jar
py4j/0.10.9.4//py4j-0.10.9.4.jar
remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar
scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar
parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar
pickle/1.2//pickle-1.2.jar
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
py4j/0.10.9.3//py4j-0.10.9.3.jar
py4j/0.10.9.4//py4j-0.10.9.4.jar
remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar
scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar
Expand Down
2 changes: 1 addition & 1 deletion docs/job-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,5 +304,5 @@ via `sc.setJobGroup` in a separate PVM thread, which also disallows to cancel th
later.

`pyspark.InheritableThread` is recommended to use together for a PVM thread to inherit the inheritable attributes
such as local properties in a JVM thread, and to avoid resource leak.
such as local properties in a JVM thread.

2 changes: 1 addition & 1 deletion python/docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SPHINXBUILD ?= sphinx-build
SOURCEDIR ?= source
BUILDDIR ?= build

export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.3-src.zip)
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.4-src.zip)

# Put it first so that "make" without argument is like "make help".
help:
Expand Down
2 changes: 1 addition & 1 deletion python/docs/make2.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ if "%SPHINXBUILD%" == "" (
set SOURCEDIR=source
set BUILDDIR=build

set PYTHONPATH=..;..\lib\py4j-0.10.9.3-src.zip
set PYTHONPATH=..;..\lib\py4j-0.10.9.4-src.zip

if "%1" == "" goto help

Expand Down
2 changes: 1 addition & 1 deletion python/docs/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Package Minimum supported version Note
`pandas` 1.0.5 Optional for Spark SQL
`NumPy` 1.7 Required for MLlib DataFrame-based API
`pyarrow` 1.0.0 Optional for Spark SQL
`Py4J` 0.10.9.3 Required
`Py4J` 0.10.9.4 Required
`pandas` 1.0.5 Required for pandas API on Spark
`pyarrow` 1.0.0 Required for pandas API on Spark
`Numpy` 1.14 Required for pandas API on Spark
Expand Down
Binary file removed python/lib/py4j-0.10.9.3-src.zip
Binary file not shown.
Binary file added python/lib/py4j-0.10.9.4-src.zip
Binary file not shown.
6 changes: 3 additions & 3 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ def setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool =
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance, and preventing resource leak.
local inheritance.
Examples
--------
Expand Down Expand Up @@ -1405,7 +1405,7 @@ def setLocalProperty(self, key: str, value: str) -> None:
Notes
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance, and preventing resource leak.
local inheritance.
"""
self._jsc.setLocalProperty(key, value)

Expand All @@ -1423,7 +1423,7 @@ def setJobDescription(self, value: str) -> None:
Notes
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance, and preventing resource leak.
local inheritance.
"""
self._jsc.setJobDescription(value)

Expand Down
35 changes: 5 additions & 30 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,10 @@ def inheritable_thread_target(f: Callable) -> Callable:

@functools.wraps(f)
def wrapped(*args: Any, **kwargs: Any) -> Any:
try:
# Set local properties in child thread.
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
return f(*args, **kwargs)
finally:
InheritableThread._clean_py4j_conn_for_current_thread()
# Set local properties in child thread.
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
return f(*args, **kwargs)

return wrapped
else:
Expand Down Expand Up @@ -377,10 +374,7 @@ def copy_local_properties(*a: Any, **k: Any) -> Any:
assert hasattr(self, "_props")
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(self._props)
try:
return target(*a, **k)
finally:
InheritableThread._clean_py4j_conn_for_current_thread()
return target(*a, **k)

super(InheritableThread, self).__init__(
target=copy_local_properties, *args, **kwargs # type: ignore[misc]
Expand All @@ -401,25 +395,6 @@ def start(self) -> None:
self._props = SparkContext._active_spark_context._jsc.sc().getLocalProperties().clone()
return super(InheritableThread, self).start()

@staticmethod
def _clean_py4j_conn_for_current_thread() -> None:
from pyspark import SparkContext

jvm = SparkContext._jvm
assert jvm is not None
thread_connection = jvm._gateway_client.get_thread_connection()
if thread_connection is not None:
try:
# Dequeue is shared across other threads but it's thread-safe.
# If this function has to be invoked one more time in the same thead
# Py4J will create a new connection automatically.
jvm._gateway_client.deque.remove(thread_connection)
except ValueError:
# Should never reach this point
return
finally:
thread_connection.close()


if __name__ == "__main__":
if "pypy" not in platform.python_implementation().lower() and sys.version_info[:2] >= (3, 7):
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def run(self):
license='http://www.apache.org/licenses/LICENSE-2.0',
# Don't forget to update python/docs/source/getting_started/install.rst
# if you're updating the versions or dependencies.
install_requires=['py4j==0.10.9.3'],
install_requires=['py4j==0.10.9.4'],
extras_require={
'ml': ['numpy>=1.15'],
'mllib': ['numpy>=1.15'],
Expand Down
2 changes: 1 addition & 1 deletion sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.4-src.zip:${PYTHONPATH}"
export PYSPARK_PYTHONPATH_SET=1
fi

0 comments on commit 3bbf346

Please sign in to comment.