Skip to content

Commit

Permalink
[SPARK-3114] [PySpark] Fix Python UDFs in Spark SQL.
Browse files Browse the repository at this point in the history
This fixes SPARK-3114, an issue where we inadvertently broke Python UDFs in Spark SQL.

This PR modifiers the test runner script to always run the PySpark SQL tests, irrespective of whether SparkSQL itself has been modified.  It also includes Davies' fix for the bug.

Closes #2026.

Author: Josh Rosen <[email protected]>
Author: Davies Liu <[email protected]>

Closes #2027 from JoshRosen/pyspark-sql-fix and squashes the following commits:

9af2708 [Davies Liu] bugfix: disable compression of command
0d8d3a4 [Josh Rosen] Always run Python Spark SQL tests.

(cherry picked from commit 1f1819b)
Signed-off-by: Josh Rosen <[email protected]>
  • Loading branch information
JoshRosen committed Aug 19, 2014
1 parent 7d069bf commit 3a03259
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 9 deletions.
17 changes: 13 additions & 4 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ if [ -n "$AMPLAB_JENKINS" ]; then
diffs=`git diff --name-only master | grep "^sql/"`
if [ -n "$diffs" ]; then
echo "Detected changes in SQL. Will run Hive test suite."
export _RUN_SQL_TESTS=true # exported for PySpark tests
_RUN_SQL_TESTS=true
fi
fi

Expand Down Expand Up @@ -89,13 +89,22 @@ echo "========================================================================="
echo "Running Spark unit tests"
echo "========================================================================="

# Build Spark; we always build with Hive because the PySpark SparkSQL tests need it.
# echo "q" is needed because sbt on encountering a build file with failure
# (either resolution or compilation) prompts the user for input either q, r,
# etc to quit or retry. This echo is there to make it not block.
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver "
echo -e "q\n" | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly | \
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"

# If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled:
if [ -n "$_RUN_SQL_TESTS" ]; then
SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver"
fi
# echo "q" is needed because sbt on encountering a build file with failure
# (either resolution or compilation) prompts the user for input either q, r,
# echo "q" is needed because sbt on encountering a build file with failure
# (either resolution or compilation) prompts the user for input either q, r,
# etc to quit or retry. This echo is there to make it not block.
echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS clean package assembly/assembly test | \
echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS test | \
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"

echo ""
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1812,7 +1812,7 @@ def _jrdd(self):
self._jrdd_deserializer = NoOpSerializer()
command = (self.func, self._prev_jrdd_deserializer,
self._jrdd_deserializer)
ser = CompressedSerializer(CloudPickleSerializer())
ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def main(infile, outfile):
value = ser._read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, value)

command = ser._read_with_length(infile)
command = pickleSer._read_with_length(infile)
(func, deserializer, serializer) = command
init_time = time.time()
iterator = deserializer.load_stream(infile)
Expand Down
4 changes: 1 addition & 3 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ $PYSPARK_PYTHON --version
run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
run_test "pyspark/conf.py"
if [ -n "$_RUN_SQL_TESTS" ]; then
run_test "pyspark/sql.py"
fi
run_test "pyspark/sql.py"
# These tests are included in the module-level docs, and so must
# be handled on a higher level rather than within the python file.
export PYSPARK_DOC_TEST=1
Expand Down

0 comments on commit 3a03259

Please sign in to comment.