-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23122][PYTHON][SQL] Deprecate register* for UDFs in SQLContext and Catalog in PySpark #20288
Closed
Closed
Changes from 9 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
f63105c
Deprecate register* for UDFs in SQLContext and Catalog in PySpark
HyukjinKwon 08438ee
Focus on the issue itself
HyukjinKwon 6b9b9c4
Address comments and clean up
HyukjinKwon f1fe40a
Clean up imports
HyukjinKwon c6ed44a
Minor doc fix
HyukjinKwon 08ffa1c
Fix minor nits found
HyukjinKwon 4367beb
one more space
HyukjinKwon 3e0147b
Use link instead of doc copy
HyukjinKwon c9512a6
Clean up imports
HyukjinKwon 00f5d19
Clean up imports
HyukjinKwon e121273
Clean up imports
HyukjinKwon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,9 +29,10 @@ | |
from pyspark.sql.readwriter import DataFrameReader | ||
from pyspark.sql.streaming import DataStreamReader | ||
from pyspark.sql.types import IntegerType, Row, StringType | ||
from pyspark.sql.udf import UDFRegistration | ||
from pyspark.sql.utils import install_exception_handler | ||
|
||
__all__ = ["SQLContext", "HiveContext", "UDFRegistration"] | ||
__all__ = ["SQLContext", "HiveContext"] | ||
|
||
|
||
class SQLContext(object): | ||
|
@@ -147,7 +148,7 @@ def udf(self): | |
|
||
:return: :class:`UDFRegistration` | ||
""" | ||
return UDFRegistration(self) | ||
return self.sparkSession.udf | ||
|
||
@since(1.4) | ||
def range(self, start, end=None, step=1, numPartitions=None): | ||
|
@@ -172,113 +173,29 @@ def range(self, start, end=None, step=1, numPartitions=None): | |
""" | ||
return self.sparkSession.range(start, end, step, numPartitions) | ||
|
||
@ignore_unicode_prefix | ||
@since(1.2) | ||
def registerFunction(self, name, f, returnType=None): | ||
"""Registers a Python function (including lambda function) or a :class:`UserDefinedFunction` | ||
as a UDF. The registered UDF can be used in SQL statements. | ||
|
||
:func:`spark.udf.register` is an alias for :func:`sqlContext.registerFunction`. | ||
|
||
In addition to a name and the function itself, `returnType` can be optionally specified. | ||
1) When f is a Python function, `returnType` defaults to a string. The produced object must | ||
match the specified type. 2) When f is a :class:`UserDefinedFunction`, Spark uses the return | ||
type of the given UDF as the return type of the registered UDF. The input parameter | ||
`returnType` is None by default. If given by users, the value must be None. | ||
|
||
:param name: name of the UDF in SQL statements. | ||
:param f: a Python function, or a wrapped/native UserDefinedFunction. The UDF can be either | ||
row-at-a-time or vectorized. | ||
:param returnType: the return type of the registered UDF. | ||
:return: a wrapped/native :class:`UserDefinedFunction` | ||
|
||
>>> strlen = sqlContext.registerFunction("stringLengthString", lambda x: len(x)) | ||
>>> sqlContext.sql("SELECT stringLengthString('test')").collect() | ||
[Row(stringLengthString(test)=u'4')] | ||
|
||
>>> sqlContext.sql("SELECT 'foo' AS text").select(strlen("text")).collect() | ||
[Row(stringLengthString(text)=u'3')] | ||
|
||
>>> from pyspark.sql.types import IntegerType | ||
>>> _ = sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType()) | ||
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect() | ||
[Row(stringLengthInt(test)=4)] | ||
|
||
>>> from pyspark.sql.types import IntegerType | ||
>>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) | ||
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect() | ||
[Row(stringLengthInt(test)=4)] | ||
|
||
>>> from pyspark.sql.types import IntegerType | ||
>>> from pyspark.sql.functions import udf | ||
>>> slen = udf(lambda s: len(s), IntegerType()) | ||
>>> _ = sqlContext.udf.register("slen", slen) | ||
>>> sqlContext.sql("SELECT slen('test')").collect() | ||
[Row(slen(test)=4)] | ||
|
||
>>> import random | ||
>>> from pyspark.sql.functions import udf | ||
>>> from pyspark.sql.types import IntegerType | ||
>>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() | ||
>>> new_random_udf = sqlContext.registerFunction("random_udf", random_udf) | ||
>>> sqlContext.sql("SELECT random_udf()").collect() # doctest: +SKIP | ||
[Row(random_udf()=82)] | ||
>>> sqlContext.range(1).select(new_random_udf()).collect() # doctest: +SKIP | ||
[Row(<lambda>()=26)] | ||
|
||
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType | ||
>>> @pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP | ||
... def add_one(x): | ||
... return x + 1 | ||
... | ||
>>> _ = sqlContext.udf.register("add_one", add_one) # doctest: +SKIP | ||
>>> sqlContext.sql("SELECT add_one(id) FROM range(3)").collect() # doctest: +SKIP | ||
[Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)] | ||
"""An alias for :func:`spark.udf.register`. | ||
See :meth:`pyspark.sql.UDFRegistration.register`. | ||
|
||
.. note:: Deprecated in 2.3.0. Use :func:`spark.udf.register` instead. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
""" | ||
return self.sparkSession.catalog.registerFunction(name, f, returnType) | ||
warnings.warn( | ||
"Deprecated in 2.3.0. Use spark.udf.register instead.", | ||
DeprecationWarning) | ||
return self.sparkSession.udf.register(name, f, returnType) | ||
|
||
@ignore_unicode_prefix | ||
@since(2.1) | ||
def registerJavaFunction(self, name, javaClassName, returnType=None): | ||
"""Register a java UDF so it can be used in SQL statements. | ||
|
||
In addition to a name and the function itself, the return type can be optionally specified. | ||
When the return type is not specified we would infer it via reflection. | ||
:param name: name of the UDF | ||
:param javaClassName: fully qualified name of java class | ||
:param returnType: a :class:`pyspark.sql.types.DataType` object | ||
|
||
>>> sqlContext.registerJavaFunction("javaStringLength", | ||
... "test.org.apache.spark.sql.JavaStringLength", IntegerType()) | ||
>>> sqlContext.sql("SELECT javaStringLength('test')").collect() | ||
[Row(UDF:javaStringLength(test)=4)] | ||
>>> sqlContext.registerJavaFunction("javaStringLength2", | ||
... "test.org.apache.spark.sql.JavaStringLength") | ||
>>> sqlContext.sql("SELECT javaStringLength2('test')").collect() | ||
[Row(UDF:javaStringLength2(test)=4)] | ||
"""An alias for :func:`spark.udf.registerJavaFunction`. | ||
See :meth:`pyspark.sql.UDFRegistration.registerJavaFunction`. | ||
|
||
.. note:: Deprecated in 2.3.0. Use :func:`spark.udf.registerJavaFunction` instead. | ||
""" | ||
jdt = None | ||
if returnType is not None: | ||
jdt = self.sparkSession._jsparkSession.parseDataType(returnType.json()) | ||
self.sparkSession._jsparkSession.udf().registerJava(name, javaClassName, jdt) | ||
|
||
@ignore_unicode_prefix | ||
@since(2.3) | ||
def registerJavaUDAF(self, name, javaClassName): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are fine to remove this one because this is added within 2.3.0 timeline - https://issues.apache.org/jira/browse/SPARK-19439 |
||
"""Register a java UDAF so it can be used in SQL statements. | ||
|
||
:param name: name of the UDAF | ||
:param javaClassName: fully qualified name of java class | ||
|
||
>>> sqlContext.registerJavaUDAF("javaUDAF", | ||
... "test.org.apache.spark.sql.MyDoubleAvg") | ||
>>> df = sqlContext.createDataFrame([(1, "a"),(2, "b"), (3, "a")],["id", "name"]) | ||
>>> df.registerTempTable("df") | ||
>>> sqlContext.sql("SELECT name, javaUDAF(id) as avg from df group by name").collect() | ||
[Row(name=u'b', avg=102.0), Row(name=u'a', avg=102.0)] | ||
""" | ||
self.sparkSession._jsparkSession.udf().registerJavaUDAF(name, javaClassName) | ||
warnings.warn( | ||
"Deprecated in 2.3.0. Use spark.udf.registerJavaFunction instead.", | ||
DeprecationWarning) | ||
return self.sparkSession.udf.registerJavaFunction(name, javaClassName, returnType) | ||
|
||
# TODO(andrew): delete this once we refactor things to take in SparkSession | ||
def _inferSchema(self, rdd, samplingRatio=None): | ||
|
@@ -590,24 +507,6 @@ def refreshTable(self, tableName): | |
self._ssql_ctx.refreshTable(tableName) | ||
|
||
|
||
class UDFRegistration(object): | ||
"""Wrapper for user-defined function registration.""" | ||
|
||
def __init__(self, sqlContext): | ||
self.sqlContext = sqlContext | ||
|
||
def register(self, name, f, returnType=None): | ||
return self.sqlContext.registerFunction(name, f, returnType) | ||
|
||
def registerJavaFunction(self, name, javaClassName, returnType=None): | ||
self.sqlContext.registerJavaFunction(name, javaClassName, returnType) | ||
|
||
def registerJavaUDAF(self, name, javaClassName): | ||
self.sqlContext.registerJavaUDAF(name, javaClassName) | ||
|
||
register.__doc__ = SQLContext.registerFunction.__doc__ | ||
|
||
|
||
def _test(): | ||
import os | ||
import doctest | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intendedly kept this to retain the import path
pyspark.sql.context.UDFRegistration
just in case.