From 83606e23caf4124a65191dabdc9906990e075940 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 18 Aug 2014 20:42:19 -0700 Subject: [PATCH] [SPARK-3114] [PySpark] Fix Python UDFs in Spark SQL. This fixes SPARK-3114, an issue where we inadvertently broke Python UDFs in Spark SQL. This PR modifiers the test runner script to always run the PySpark SQL tests, irrespective of whether SparkSQL itself has been modified. It also includes Davies' fix for the bug. Closes #2026. Author: Josh Rosen Author: Davies Liu Closes #2027 from JoshRosen/pyspark-sql-fix and squashes the following commits: 9af2708 [Davies Liu] bugfix: disable compression of command 0d8d3a4 [Josh Rosen] Always run Python Spark SQL tests. --- dev/run-tests | 17 +++++++++++++---- python/pyspark/rdd.py | 2 +- python/pyspark/worker.py | 2 +- python/run-tests | 4 +--- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 0e24515d1376c..132f696d6447a 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -58,7 +58,7 @@ if [ -n "$AMPLAB_JENKINS" ]; then diffs=`git diff --name-only master | grep "^sql/"` if [ -n "$diffs" ]; then echo "Detected changes in SQL. Will run Hive test suite." - export _RUN_SQL_TESTS=true # exported for PySpark tests + _RUN_SQL_TESTS=true fi fi @@ -89,13 +89,22 @@ echo "=========================================================================" echo "Running Spark unit tests" echo "=========================================================================" +# Build Spark; we always build with Hive because the PySpark SparkSQL tests need it. +# echo "q" is needed because sbt on encountering a build file with failure +# (either resolution or compilation) prompts the user for input either q, r, +# etc to quit or retry. This echo is there to make it not block. +BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver " +echo -e "q\n" | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly | \ + grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" + +# If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled: if [ -n "$_RUN_SQL_TESTS" ]; then SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver" fi -# echo "q" is needed because sbt on encountering a build file with failure -# (either resolution or compilation) prompts the user for input either q, r, +# echo "q" is needed because sbt on encountering a build file with failure +# (either resolution or compilation) prompts the user for input either q, r, # etc to quit or retry. This echo is there to make it not block. -echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS clean package assembly/assembly test | \ +echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS test | \ grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" echo "" diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index c708b69cc1e31..86cd89b245aea 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1812,7 +1812,7 @@ def _jrdd(self): self._jrdd_deserializer = NoOpSerializer() command = (self.func, self._prev_jrdd_deserializer, self._jrdd_deserializer) - ser = CompressedSerializer(CloudPickleSerializer()) + ser = CloudPickleSerializer() pickled_command = ser.dumps(command) broadcast_vars = ListConverter().convert( [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 77a9c4a0e0677..6805063e06798 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -72,7 +72,7 @@ def main(infile, outfile): value = ser._read_with_length(infile) _broadcastRegistry[bid] = Broadcast(bid, value) - command = ser._read_with_length(infile) + command = pickleSer._read_with_length(infile) (func, deserializer, serializer) = command init_time = time.time() iterator = deserializer.load_stream(infile) diff --git a/python/run-tests b/python/run-tests index b506559a5e810..7b1ee3e1cddba 100755 --- a/python/run-tests +++ b/python/run-tests @@ -59,9 +59,7 @@ $PYSPARK_PYTHON --version run_test "pyspark/rdd.py" run_test "pyspark/context.py" run_test "pyspark/conf.py" -if [ -n "$_RUN_SQL_TESTS" ]; then - run_test "pyspark/sql.py" -fi +run_test "pyspark/sql.py" # These tests are included in the module-level docs, and so must # be handled on a higher level rather than within the python file. export PYSPARK_DOC_TEST=1