-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23352][PYTHON] Explicitly specify supported types in Pandas UDFs #20531
Conversation
@ueshin, @BryanCutler and @icexelloss, mind taking a look please when you are available? |
@@ -116,7 +116,7 @@ def wrap_grouped_agg_pandas_udf(f, return_type): | |||
def wrapped(*series): | |||
import pandas as pd | |||
result = f(*series) | |||
return pd.Series(result) | |||
return pd.Series([result]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change seems to be required:
>>> import numpy as np
>>> import pandas as pd
>>> pd.Series(np.array([1, 2, 3]))
0 1
1 2
2 3
dtype: int64
>>> pd.Series([np.array([1, 2, 3])])
0 [1, 2, 3]
dtype: object
>>> pd.Series(1)
0 1
dtype: int64
>>> pd.Series([1])
0 1
dtype: int64
Test build #87160 has finished for PR 20531 at commit
|
Test build #87163 has finished for PR 20531 at commit
|
@@ -1734,7 +1734,7 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p | |||
|
|||
### Supported SQL Types | |||
|
|||
Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`, | |||
Currently, all Spark SQL data types are supported by Arrow-based conversion except `BinaryType`, `MapType`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought binary type is supported... I am curious, what's the reason that it doesn't work now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was under impression that we don't support this. Seems Arrow doesn't work consistently with what Spark does. I think it's actually related with #20507.
I am careful to say this out but I believe the root cause is how to handle str
in Python 2. Technically, it's bytes but named string. As you might already know, due to this confusion, unicode
became str
and str
became bytes
in Python 3. Spark handles this as StringType
in general whereas seems Arrow deals with binaries.
I think we shouldn't support this for now until we get the consistent behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks for the explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we need to look into these details more before we can support this type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should BinaryType
be added to the unsupported types with arrow.enabled in SQLConf.scala?
python/pyspark/sql/tests.py
Outdated
def foo(x): | ||
return x | ||
self.assertEqual(foo.returnType, schema) | ||
self.assertEqual(foo.returnType, schema[0].dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just:
self.assertEqual(foo.returnType, DoubleType())
?
python/pyspark/sql/tests.py
Outdated
from pyspark.sql.functions import pandas_udf, PandasUDFType | ||
df = self.data | ||
from pyspark.sql.functions import pandas_udf, PandasUDFType, array, col | ||
df = self.data.withColumn("arr", array(col("id"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: It seems a bit arbitrary to mix array type in this test. Array probably belongs to a new test (if it doesn't exist yet) test_array
, test_complex_types
sth like test_all_types
python/pyspark/sql/tests.py
Outdated
from pyspark.sql.functions import pandas_udf, PandasUDFType | ||
|
||
with QuietTest(self.sc): | ||
with self.assertRaisesRegexp(NotImplementedError, 'not supported'): | ||
@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUPED_AGG) | ||
@pandas_udf(ArrayType(ArrayType(TimestampType())), PandasUDFType.GROUPED_AGG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is ArrayType(TimestampType())
a special case that is not supported? (I haven't fully tested this when implementing this feature, is only array of primitives supported?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems because we don't handle the timezone issue when it's nested. There are few todos, for example:
spark/python/pyspark/sql/session.py
Line 465 in 71cfba0
# TODO: handle nested timestamps, such as ArrayType(TimestampType())? |
spark/python/pyspark/sql/types.py
Line 1726 in a24c031
# TODO: handle nested timestamps, such as ArrayType(TimestampType())? |
spark/python/pyspark/sql/types.py
Line 1745 in a24c031
# TODO: handle nested timestamps, such as ArrayType(TimestampType())? |
spark/python/pyspark/sql/types.py
Line 1771 in a24c031
# TODO: handle nested timestamps, such as ArrayType(TimestampType())? |
python/pyspark/sql/udf.py
Outdated
to_arrow_schema(self._returnType_placeholder) | ||
except TypeError: | ||
raise NotImplementedError( | ||
"Invalid returnType with a grouped map Pandas UDF: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a grouped map Pandas UDF
-> grouped map Pandas UDFs
?
|
||
result1 = df.groupby('id').agg(sum_udf(df.v), mean_udf(df.v)).sort('id') | ||
mean_arr_udf = pandas_udf( | ||
self.pandas_agg_mean_udf.func, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For arrays, can we add tests for:
- Test type coercion, e.g., specified type is
array<double>
and returned array is[0, 1, 2]
? - Test exception: function returns array of different types like
[0, "hello"]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, we can have this to be a follow up and I can do it too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you meant to type coercion (did I understand correctly?), I already tested in my local. Seems not working properly. Similar thing was discussed in #20163 (comment) (thanks @ueshin).
Will reread the comments when I am more awake tomorrow ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with Pandas UDFs, certain type coercion is supported, e.g., when user specify "double type" and returns a pd.Series
of int
, it will automatically cast it to pd.Series
of double. This behavior is different from regular Python UDF, which will return null in this case. Most of the type coercion is done by pyarrow. (Btw, I think type coercion in Pandas UDFs is an huge improvement over Python UDF because that's one of the biggest frustration our PySpark users have...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, if type coercion is not working with array type, I think it's still fine to allow using array type and fix type coercion separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm .. let's do it separately for type coercion stuff in another issue. I think we need another iteration for it. I am actually less sure yet if we officially document and support the type coercion given our past discussion.
@HyukjinKwon Looks good to me at high level. Left some comments. |
Yup, let me try to address them tomorrow. Thanks for your review. |
python/pyspark/sql/udf.py
Outdated
"Invalid returnType with a grouped map Pandas UDF: " | ||
"%s is not supported" % str(self._returnType_placeholder)) | ||
else: | ||
raise TypeError("Invalid returnType for a grouped map Pandas " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a grouped map Pandas UDF
-> grouped map Pandas UDFs?
python/pyspark/sql/udf.py
Outdated
to_arrow_type(self._returnType_placeholder) | ||
except TypeError: | ||
raise NotImplementedError( | ||
"Invalid returnType with a scalar Pandas UDF: %s is " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
python/pyspark/sql/udf.py
Outdated
to_arrow_type(self._returnType_placeholder) | ||
except TypeError: | ||
raise NotImplementedError( | ||
"Invalid returnType with a grouped aggregate Pandas UDF: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Test build #87164 has finished for PR 20531 at commit
|
python/pyspark/sql/udf.py
Outdated
raise NotImplementedError( | ||
"ArrayType, StructType and MapType are not supported with " | ||
"PandasUDFType.GROUPED_AGG") | ||
if self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd prefer to keep the check order by the definition in PythonEvalType
if you don't have a special reason.
E.g.,
if self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
...
elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
...
elif self.evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
...
python/pyspark/sql/tests.py
Outdated
from pyspark.sql.functions import pandas_udf, PandasUDFType | ||
|
||
with QuietTest(self.sc): | ||
with self.assertRaisesRegexp(NotImplementedError, 'not supported'): | ||
@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUPED_AGG) | ||
@pandas_udf(ArrayType(ArrayType(TimestampType())), PandasUDFType.GROUPED_AGG) | ||
def mean_and_std_udf(v): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should rename this?
@ueshin and @icexelloss, thanks for your review. I tried to address the comments at my best. |
Test build #87214 has finished for PR 20531 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just mentioned that might want to include BInaryType as unsupported in SQLConf doc. Thanks for doing some cleanup too!
@@ -1676,7 +1676,7 @@ Using the above optimizations with Arrow will produce the same results as when A | |||
enabled. Note that even with Arrow, `toPandas()` results in the collection of all records in the | |||
DataFrame to the driver program and should be done on a small subset of the data. Not all Spark | |||
data types are currently supported and an error can be raised if a column has an unsupported type, | |||
see [Supported Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`, | |||
see [Supported SQL Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
@@ -1734,7 +1734,7 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p | |||
|
|||
### Supported SQL Types | |||
|
|||
Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`, | |||
Currently, all Spark SQL data types are supported by Arrow-based conversion except `BinaryType`, `MapType`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should BinaryType
be added to the unsupported types with arrow.enabled in SQLConf.scala?
@HyukjinKwon LGTM! My only comment left is #20531 (comment) . But we can have separate PR for testing type coercion with array type. |
Test build #87280 has finished for PR 20531 at commit
|
retest this please |
Test build #87281 has finished for PR 20531 at commit
|
@ueshin, does this looks fine to you too? |
@HyukjinKwon Yes, LGTM. |
retest this please |
Test build #87325 has finished for PR 20531 at commit
|
retest this please |
Test build #87328 has finished for PR 20531 at commit
|
Test build #87329 has finished for PR 20531 at commit
|
Merged to master. |
Thank you for reviewing this, @icexelloss, @ueshin and @BryanCutler. |
This PR targets to explicitly specify supported types in Pandas UDFs. The main change here is to add a deduplicated and explicit type checking in `returnType` ahead with documenting this; however, it happened to fix multiple things. 1. Currently, we don't support `BinaryType` in Pandas UDFs, for example, see: ```python from pyspark.sql.functions import pandas_udf pudf = pandas_udf(lambda x: x, "binary") df = spark.createDataFrame([[bytearray(1)]]) df.select(pudf("_1")).show() ``` ``` ... TypeError: Unsupported type in conversion to Arrow: BinaryType ``` We can document this behaviour for its guide. 2. Also, the grouped aggregate Pandas UDF fails fast on `ArrayType` but seems we can support this case. ```python from pyspark.sql.functions import pandas_udf, PandasUDFType foo = pandas_udf(lambda v: v.mean(), 'array<double>', PandasUDFType.GROUPED_AGG) df = spark.range(100).selectExpr("id", "array(id) as value") df.groupBy("id").agg(foo("value")).show() ``` ``` ... NotImplementedError: ArrayType, StructType and MapType are not supported with PandasUDFType.GROUPED_AGG ``` 3. Since we can check the return type ahead, we can fail fast before actual execution. ```python # we can fail fast at this stage because we know the schema ahead pandas_udf(lambda x: x, BinaryType()) ``` Manually tested and unit tests for `BinaryType` and `ArrayType(...)` were added. Author: hyukjinkwon <[email protected]> Closes apache#20531 from HyukjinKwon/pudf-cleanup. (cherry picked from commit c338c8c) Signed-off-by: hyukjinkwon <[email protected]>
@@ -1638,6 +1638,8 @@ def to_arrow_type(dt): | |||
# Timestamps should be in UTC, JVM Arrow timestamps require a timezone to be read | |||
arrow_type = pa.timestamp('us', tz='UTC') | |||
elif type(dt) == ArrayType: | |||
if type(dt.elementType) == TimestampType: | |||
raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the behavior before this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think timestamps with localisation issue. See #20531 (comment).
…in Pandas UDFs ## What changes were proposed in this pull request? This PR backports #20531: It explicitly specifies supported types in Pandas UDFs. The main change here is to add a deduplicated and explicit type checking in `returnType` ahead with documenting this; however, it happened to fix multiple things. 1. Currently, we don't support `BinaryType` in Pandas UDFs, for example, see: ```python from pyspark.sql.functions import pandas_udf pudf = pandas_udf(lambda x: x, "binary") df = spark.createDataFrame([[bytearray(1)]]) df.select(pudf("_1")).show() ``` ``` ... TypeError: Unsupported type in conversion to Arrow: BinaryType ``` We can document this behaviour for its guide. 2. Since we can check the return type ahead, we can fail fast before actual execution. ```python # we can fail fast at this stage because we know the schema ahead pandas_udf(lambda x: x, BinaryType()) ``` ## How was this patch tested? Manually tested and unit tests for `BinaryType` and `ArrayType(...)` were added. Author: hyukjinkwon <[email protected]> Closes #20588 from HyukjinKwon/PR_TOOL_PICK_PR_20531_BRANCH-2.3.
What changes were proposed in this pull request?
This PR targets to explicitly specify supported types in Pandas UDFs.
The main change here is to add a deduplicated and explicit type checking in
returnType
ahead with documenting this; however, it happened to fix multiple things.Currently, we don't support
BinaryType
in Pandas UDFs, for example, see:We can document this behaviour for its guide.
Also, the grouped aggregate Pandas UDF fails fast on
ArrayType
but seems we can support this case.Since we can check the return type ahead, we can fail fast before actual execution.
How was this patch tested?
Manually tested and unit tests for
BinaryType
andArrayType(...)
were added.