Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38563][PYTHON] Upgrade to Py4J 0.10.9.4 #35871

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export PYSPARK_DRIVER_PYTHON_OPTS

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.4-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.3-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.4-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.9.3</version>
<version>0.10.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}

private[spark] object PythonUtils {
val PY4J_ZIP_NAME = "py4j-0.10.9.3-src.zip"
val PY4J_ZIP_NAME = "py4j-0.10.9.4-src.zip"

/** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
def sparkPythonPath: String = {
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar
parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar
pickle/1.2//pickle-1.2.jar
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
py4j/0.10.9.3//py4j-0.10.9.3.jar
py4j/0.10.9.4//py4j-0.10.9.4.jar
remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar
scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar
parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar
pickle/1.2//pickle-1.2.jar
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
py4j/0.10.9.3//py4j-0.10.9.3.jar
py4j/0.10.9.4//py4j-0.10.9.4.jar
remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar
scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar
Expand Down
2 changes: 1 addition & 1 deletion docs/job-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,5 +304,5 @@ via `sc.setJobGroup` in a separate PVM thread, which also disallows to cancel th
later.

`pyspark.InheritableThread` is recommended to use together for a PVM thread to inherit the inheritable attributes
such as local properties in a JVM thread, and to avoid resource leak.
such as local properties in a JVM thread.

2 changes: 1 addition & 1 deletion python/docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SPHINXBUILD ?= sphinx-build
SOURCEDIR ?= source
BUILDDIR ?= build

export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.3-src.zip)
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.4-src.zip)

# Put it first so that "make" without argument is like "make help".
help:
Expand Down
2 changes: 1 addition & 1 deletion python/docs/make2.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ if "%SPHINXBUILD%" == "" (
set SOURCEDIR=source
set BUILDDIR=build

set PYTHONPATH=..;..\lib\py4j-0.10.9.3-src.zip
set PYTHONPATH=..;..\lib\py4j-0.10.9.4-src.zip

if "%1" == "" goto help

Expand Down
2 changes: 1 addition & 1 deletion python/docs/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Package Minimum supported version Note
`pandas` 1.0.5 Optional for Spark SQL
`NumPy` 1.7 Required for MLlib DataFrame-based API
`pyarrow` 1.0.0 Optional for Spark SQL
`Py4J` 0.10.9.3 Required
`Py4J` 0.10.9.4 Required
`pandas` 1.0.5 Required for pandas API on Spark
`pyarrow` 1.0.0 Required for pandas API on Spark
`Numpy` 1.14 Required for pandas API on Spark
Expand Down
Binary file removed python/lib/py4j-0.10.9.3-src.zip
Binary file not shown.
Binary file added python/lib/py4j-0.10.9.4-src.zip
Binary file not shown.
6 changes: 3 additions & 3 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ def setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool =
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.

If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance, and preventing resource leak.
local inheritance.

Examples
--------
Expand Down Expand Up @@ -1405,7 +1405,7 @@ def setLocalProperty(self, key: str, value: str) -> None:
Notes
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance, and preventing resource leak.
local inheritance.
"""
self._jsc.setLocalProperty(key, value)

Expand All @@ -1423,7 +1423,7 @@ def setJobDescription(self, value: str) -> None:
Notes
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance, and preventing resource leak.
local inheritance.
"""
self._jsc.setJobDescription(value)

Expand Down
35 changes: 5 additions & 30 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,10 @@ def inheritable_thread_target(f: Callable) -> Callable:

@functools.wraps(f)
def wrapped(*args: Any, **kwargs: Any) -> Any:
try:
# Set local properties in child thread.
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
return f(*args, **kwargs)
finally:
InheritableThread._clean_py4j_conn_for_current_thread()
# Set local properties in child thread.
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
return f(*args, **kwargs)

return wrapped
else:
Expand Down Expand Up @@ -377,10 +374,7 @@ def copy_local_properties(*a: Any, **k: Any) -> Any:
assert hasattr(self, "_props")
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(self._props)
try:
return target(*a, **k)
finally:
InheritableThread._clean_py4j_conn_for_current_thread()
return target(*a, **k)

super(InheritableThread, self).__init__(
target=copy_local_properties, *args, **kwargs # type: ignore[misc]
Expand All @@ -401,25 +395,6 @@ def start(self) -> None:
self._props = SparkContext._active_spark_context._jsc.sc().getLocalProperties().clone()
return super(InheritableThread, self).start()

@staticmethod
def _clean_py4j_conn_for_current_thread() -> None:
from pyspark import SparkContext

jvm = SparkContext._jvm
assert jvm is not None
thread_connection = jvm._gateway_client.get_thread_connection()
if thread_connection is not None:
try:
# Dequeue is shared across other threads but it's thread-safe.
# If this function has to be invoked one more time in the same thead
# Py4J will create a new connection automatically.
jvm._gateway_client.deque.remove(thread_connection)
except ValueError:
# Should never reach this point
return
finally:
thread_connection.close()


if __name__ == "__main__":
if "pypy" not in platform.python_implementation().lower() and sys.version_info[:2] >= (3, 7):
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def run(self):
license='http://www.apache.org/licenses/LICENSE-2.0',
# Don't forget to update python/docs/source/getting_started/install.rst
# if you're updating the versions or dependencies.
install_requires=['py4j==0.10.9.3'],
install_requires=['py4j==0.10.9.4'],
extras_require={
'ml': ['numpy>=1.15'],
'mllib': ['numpy>=1.15'],
Expand Down
2 changes: 1 addition & 1 deletion sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.4-src.zip:${PYTHONPATH}"
export PYSPARK_PYTHONPATH_SET=1
fi