Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23261] [PySpark] Rename Pandas UDFs #20428

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ private[spark] object PythonEvalType {

val SQL_BATCHED_UDF = 100

val SQL_PANDAS_SCALAR_UDF = 200
val SQL_PANDAS_GROUP_MAP_UDF = 201
val SQL_PANDAS_GROUP_AGG_UDF = 202
val SQL_SCALAR_PANDAS_UDF = 200
val SQL_GROUPED_MAP_PANDAS_UDF = 201
val SQL_GROUPED_AGG_PANDAS_UDF = 202

def toString(pythonEvalType: Int): String = pythonEvalType match {
case NON_UDF => "NON_UDF"
case SQL_BATCHED_UDF => "SQL_BATCHED_UDF"
case SQL_PANDAS_SCALAR_UDF => "SQL_PANDAS_SCALAR_UDF"
case SQL_PANDAS_GROUP_MAP_UDF => "SQL_PANDAS_GROUP_MAP_UDF"
case SQL_PANDAS_GROUP_AGG_UDF => "SQL_PANDAS_GROUP_AGG_UDF"
case SQL_SCALAR_PANDAS_UDF => "SQL_SCALAR_PANDAS_UDF"
case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF"
case SQL_GROUPED_AGG_PANDAS_UDF => "SQL_GROUPED_AGG_PANDAS_UDF"
}
}

Expand Down
8 changes: 4 additions & 4 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1684,7 +1684,7 @@ Spark will fall back to create the DataFrame without Arrow.
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and
Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator
or to wrap the function, no additional configuration is required. Currently, there are two types of
Pandas UDF: Scalar and Group Map.
Pandas UDF: Scalar and Grouped Map.

### Scalar

Expand All @@ -1702,8 +1702,8 @@ The following example shows how to create a scalar Pandas UDF that computes the
</div>
</div>

### Group Map
Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
### Grouped Map
Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
Split-apply-combine consists of three steps:
* Split the data into groups by using `DataFrame.groupBy`.
* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The
Expand All @@ -1723,7 +1723,7 @@ The following example shows how to use `groupby().apply()` to subtract the mean

<div class="codetabs">
<div data-lang="python" markdown="1">
{% include_example group_map_pandas_udf python/sql/arrow.py %}
{% include_example grouped_map_pandas_udf python/sql/arrow.py %}
</div>
</div>

Expand Down
12 changes: 6 additions & 6 deletions examples/src/main/python/sql/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ def multiply_func(a, b):
# $example off:scalar_pandas_udf$


def group_map_pandas_udf_example(spark):
# $example on:group_map_pandas_udf$
def grouped_map_pandas_udf_example(spark):
# $example on:grouped_map_pandas_udf$
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def substract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
Expand All @@ -110,7 +110,7 @@ def substract_mean(pdf):
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
# $example off:group_map_pandas_udf$
# $example off:grouped_map_pandas_udf$


if __name__ == "__main__":
Expand All @@ -123,7 +123,7 @@ def substract_mean(pdf):
dataframe_with_arrow_example(spark)
print("Running pandas_udf scalar example")
scalar_pandas_udf_example(spark)
print("Running pandas_udf group map example")
group_map_pandas_udf_example(spark)
print("Running pandas_udf grouped map example")
grouped_map_pandas_udf_example(spark)

spark.stop()
6 changes: 3 additions & 3 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ class PythonEvalType(object):

SQL_BATCHED_UDF = 100

SQL_PANDAS_SCALAR_UDF = 200
SQL_PANDAS_GROUP_MAP_UDF = 201
SQL_PANDAS_GROUP_AGG_UDF = 202
SQL_SCALAR_PANDAS_UDF = 200
SQL_GROUPED_MAP_PANDAS_UDF = 201
SQL_GROUPED_AGG_PANDAS_UDF = 202


def portable_hash(x):
Expand Down
34 changes: 17 additions & 17 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1737,8 +1737,8 @@ def translate(srcCol, matching, replace):
def create_map(*cols):
"""Creates a new map column.

:param cols: list of column names (string) or list of :class:`Column` expressions that grouped
as key-value pairs, e.g. (key1, value1, key2, value2, ...).
:param cols: list of column names (string) or list of :class:`Column` expressions that are
grouped as key-value pairs, e.g. (key1, value1, key2, value2, ...).

>>> df.select(create_map('name', 'age').alias("map")).collect()
[Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
Expand Down Expand Up @@ -2085,11 +2085,11 @@ def map_values(col):
class PandasUDFType(object):
"""Pandas UDF Types. See :meth:`pyspark.sql.functions.pandas_udf`.
"""
SCALAR = PythonEvalType.SQL_PANDAS_SCALAR_UDF
SCALAR = PythonEvalType.SQL_SCALAR_PANDAS_UDF

GROUP_MAP = PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF
GROUPED_MAP = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF

GROUP_AGG = PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF
GROUPED_AGG = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF


@since(1.3)
Expand Down Expand Up @@ -2193,20 +2193,20 @@ def pandas_udf(f=None, returnType=None, functionType=None):
Therefore, this can be used, for example, to ensure the length of each returned
`pandas.Series`, and can not be used as the column length.

2. GROUP_MAP
2. GROUPED_MAP

A group map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
The returnType should be a :class:`StructType` describing the schema of the returned
`pandas.DataFrame`.
The length of the returned `pandas.DataFrame` can be arbitrary.

Group map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.

>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v")) # doctest: +SKIP
>>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP) # doctest: +SKIP
>>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
Expand All @@ -2223,9 +2223,9 @@ def pandas_udf(f=None, returnType=None, functionType=None):

.. seealso:: :meth:`pyspark.sql.GroupedData.apply`

3. GROUP_AGG
3. GROUPED_AGG

A group aggregate UDF defines a transformation: One or more `pandas.Series` -> A scalar
A grouped aggregate UDF defines a transformation: One or more `pandas.Series` -> A scalar
The `returnType` should be a primitive data type, e.g., :class:`DoubleType`.
The returned scalar can be either a python primitive type, e.g., `int` or `float`
or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
Expand All @@ -2239,7 +2239,7 @@ def pandas_udf(f=None, returnType=None, functionType=None):
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> @pandas_udf("double", PandasUDFType.GROUP_AGG) # doctest: +SKIP
>>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP
... def mean_udf(v):
... return v.mean()
>>> df.groupby("id").agg(mean_udf(df['v'])).show() # doctest: +SKIP
Expand Down Expand Up @@ -2285,21 +2285,21 @@ def pandas_udf(f=None, returnType=None, functionType=None):
eval_type = returnType
else:
# @pandas_udf(dataType) or @pandas_udf(returnType=dataType)
eval_type = PythonEvalType.SQL_PANDAS_SCALAR_UDF
eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
else:
return_type = returnType

if functionType is not None:
eval_type = functionType
else:
eval_type = PythonEvalType.SQL_PANDAS_SCALAR_UDF
eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF

if return_type is None:
raise ValueError("Invalid returnType: returnType can not be None")

if eval_type not in [PythonEvalType.SQL_PANDAS_SCALAR_UDF,
PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF,
PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF]:
if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
raise ValueError("Invalid functionType: "
"functionType must be one the values from PandasUDFType")

Expand Down
10 changes: 5 additions & 5 deletions python/pyspark/sql/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def agg(self, *exprs):
[Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]

>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> @pandas_udf('int', PandasUDFType.GROUP_AGG) # doctest: +SKIP
>>> @pandas_udf('int', PandasUDFType.GROUPED_AGG) # doctest: +SKIP
... def min_udf(v):
... return v.min()
>>> sorted(gdf.agg(min_udf(df.age)).collect()) # doctest: +SKIP
Expand Down Expand Up @@ -235,14 +235,14 @@ def apply(self, udf):
into memory, so the user should be aware of the potential OOM risk if data is skewed
and certain groups are too large to fit in memory.

:param udf: a group map user-defined function returned by
:param udf: a grouped map user-defined function returned by
:func:`pyspark.sql.functions.pandas_udf`.

>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP) # doctest: +SKIP
>>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
Expand All @@ -262,9 +262,9 @@ def apply(self, udf):
"""
# Columns are special because hasattr always return True
if isinstance(udf, Column) or not hasattr(udf, 'func') \
or udf.evalType != PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
or udf.evalType != PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
"GROUP_MAP.")
"GROUPED_MAP.")
df = self._df
udf_column = udf(*[df[col] for col in df.columns])
jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
Expand Down
Loading