-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23836][PYTHON] Add support for StructType return in Scalar Pandas UDF #23900
[SPARK-23836][PYTHON] Add support for StructType return in Scalar Pandas UDF #23900
Conversation
ser = ArrowStreamPandasSerializer(timezone, safecheck) | ||
# NOTE: this is duplicated from wrap_grouped_map_pandas_udf | ||
assign_cols_by_name = runner_conf.get( | ||
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true")\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the groupedMap
part of the conf doesn't really make sense here, but not sure if it's worth changing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on above comment https://github.com/apache/spark/pull/23900/files#r260874304, if we are going to remove spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName
, do we need to use this config here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left it in to be consistent. I'd rather remove both of them in a separate PR in case there is some discussion about it.
Test build #102790 has finished for PR 23900 at commit
|
Test build #102803 has finished for PR 23900 at commit
|
StructType is allowed in standard udfs, and I think it's worth doing even if nested structs aren't supported since they aren't a logical Pandas type anyway. cc @HyukjinKwon @felixcheung @viirya |
@@ -64,6 +64,7 @@ | |||
from itertools import izip as zip, imap as map | |||
else: | |||
import pickle | |||
basestring = unicode = str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I thought we have other places for this kind of thing (or is it your new PR for cloudpickle)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes .. there are some places that use this here and there. IIRC, we discussed about Python 2 drop in dev mailing list. I could get rid of it soon anyway ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this and below are just for Python 2 support. Are we dropping that for Spark 3.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we'll drop it in Spark 3.0. I will cc you in the related PRs later in the future.
@@ -23,13 +23,16 @@ | |||
import time | |||
import unittest | |||
|
|||
if sys.version >= '3': | |||
unicode = str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
python/pyspark/serializers.py
Outdated
for s, t in series: | ||
if t is not None and pa.types.is_struct(t): | ||
if not isinstance(s, pd.DataFrame): | ||
raise ValueError("A field of type StructType expects a pandas.DataFrame, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(not a big deal but +
at the end wouldn't be needed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right, thanks!
@@ -254,7 +254,12 @@ def read_udfs(pickleSer, infile, eval_type): | |||
timezone = runner_conf.get("spark.sql.session.timeZone", None) | |||
safecheck = runner_conf.get("spark.sql.execution.pandas.arrowSafeTypeConversion", | |||
"false").lower() == 'true' | |||
ser = ArrowStreamPandasSerializer(timezone, safecheck) | |||
# NOTE: this is duplicated from wrap_grouped_map_pandas_udf | |||
assign_cols_by_name = runner_conf.get( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler, BTW, would you be willing to work on removing spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName
in Spark 3.0? I think this makes the codes complicated.
Also, would you mind working on upgrading minimum Arrow to 0.12.0 as well, as we discussed? (Probably it better be asked to dev mailing list first to be 100% sure).
If you're currently busy, I will take one or both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course those should be separate JIRAs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, definitely! I could take those 2 tasks. I was thinking on holding off a little while to bump up the minimum Arrow version just to see if anything major came up in the meantime releases. 0.12.1 will be out in a couple days, but I don't think major bug fixes for us. Maybe wait just a little bit longer?
Haven't checked super closely yet but looks good to me. |
Thanks @felixcheung and @HyukjinKwon ! Btw, with this change the |
Test build #102847 has finished for PR 23900 at commit
|
@@ -557,8 +557,9 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): | |||
|
|||
# Create Arrow record batches | |||
safecheck = self._wrapped._conf.arrowSafeTypeConversion() | |||
col_by_name = True # col by name only applies to StructType columns, can't happen here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meaning the value of col_by_name
doesn't matter here, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right. This will be removed when we take out that conf.
python/pyspark/serializers.py
Outdated
# Assign result columns by schema name if user labeled with strings, else use position | ||
struct_arrs = [] | ||
struct_names = [] | ||
if assign_cols_by_name and any(isinstance(name, basestring) for name in s.columns): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not all columns are labeled with string, but any one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is copied from grouped map wrap. It didn't seem necessary to check all columns to be string. The only case that ends up weird is if the columns have a mix of strings and other types. I think that would be a little strange and I'm not sure that assigning by position is the right thing to do. So this would probably end up with raising a KeyError
so that the user can fix it.
@@ -1616,6 +1616,12 @@ def to_arrow_type(dt): | |||
if type(dt.elementType) == TimestampType: | |||
raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) | |||
arrow_type = pa.list_(to_arrow_type(dt.elementType)) | |||
elif type(dt) == StructType: | |||
if any(type(field.dataType) == StructType for field in dt): | |||
raise TypeError("Nested StructType not supported in conversion to Arrow") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is ArrayType(elementType = StructType) supported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, that should not be supported right now. I added a check and put that type in a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious why ArrayType(elementType = StructType)
support was removed from here? @BryanCutler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cfmcgrady support wasn't removed, it was never allowed to have ArrayType(elementType = StructType)
- I don't think there was an explicit check before this. It might be possible to add this in the future, but it's a little tricky to represent this in Pandas efficiently, I believe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your reply.
Test build #102869 has finished for PR 23900 at commit
|
@@ -133,6 +133,8 @@ def returnType(self): | |||
"UDFs: returnType must be a StructType.") | |||
elif self.evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF: | |||
try: | |||
if isinstance(self._returnType_placeholder, StructType): | |||
raise TypeError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, @BryanCutler, sorry if I missed something but why do we throw a type error here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grouped Agg UDFs don't allow a StructType
return yet, and before relied on the call to to_arrow_type
to raise an error. Since that no longer happens, need to raise it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Can you add some message while we're here? If this is going to be fixed soon, I am okay as is as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I will try it as a followup, but a message for now will be good. I just noticed that grouped map wasn't catching a nested struct type, so I need to fix that anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, either way sounds good to me. I'll leave it to you.
"but got: %s" % str(type(s))) | ||
|
||
# Input partition and result pandas.DataFrame empty, make empty Arrays with struct | ||
if len(s) == 0 and len(s.columns) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came across a case where there was an empty partition and when the udf processed it, the result is an empty Pandas DataFrame with no columns, see https://github.com/apache/spark/pull/23900/files#diff-d1bd0bd4ceeedd30cc219293a75ad90fR395
I figured it would be pretty confusing for the user to handle these kind of cases, and it's pretty simple to just check and add an empty struct when this happens, so that's what this check is for.
Test build #103019 has finished for PR 23900 at commit
|
Test build #103025 has finished for PR 23900 at commit
|
Thanks @HyukjinKwon @viirya and @felixcheung ! If no more comments, I'll merge this tomorrow. |
Yea, no objection. I will take a look when I find some time whether it's merged or not anyway. |
merged to master |
@@ -2868,6 +2869,15 @@ def pandas_udf(f=None, returnType=None, functionType=None): | |||
+----------+--------------+------------+ | |||
| 8| JOHN DOE| 22| | |||
+----------+--------------+------------+ | |||
>>> @pandas_udf("first string, last string") # doctest: +SKIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice feature!
I am wondering if we have a better way to handle the incorrect inputs. For example, if our end users specify incorrect return data types (e.g. @pandas_udf("first string, last int")
), do we issue a user-friend error message?
19/03/21 00:04:08 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 8)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/pyspark.zip/pyspark/worker.py", line 433, in main
process()
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/pyspark.zip/pyspark/worker.py", line 428, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/pyspark.zip/pyspark/serializers.py", line 360, in dump_stream
self._assign_cols_by_name)
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/pyspark.zip/pyspark/serializers.py", line 316, in _create_batch
for i, field in enumerate(t)]
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/pyspark.zip/pyspark/serializers.py", line 287, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t)
File "pyarrow/array.pxi", line 335, in pyarrow.lib.Array.from_pandas (/Users/travis/build/BryanCutler/arrow-dist/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:30884)
return array(obj, mask=mask, type=type, memory_pool=memory_pool,
File "pyarrow/array.pxi", line 170, in pyarrow.lib.array (/Users/travis/build/BryanCutler/arrow-dist/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:29224)
return _ndarray_to_array(values, mask, type, from_pandas, pool)
File "pyarrow/array.pxi", line 70, in pyarrow.lib._ndarray_to_array (/Users/travis/build/BryanCutler/arrow-dist/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:28465)
check_status(NdarrayToArrow(pool, values, mask,
File "pyarrow/error.pxi", line 85, in pyarrow.lib.check_status (/Users/travis/build/BryanCutler/arrow-dist/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:8570)
raise ArrowNotImplementedError(message)
ArrowNotImplementedError: No cast implemented from string to int32
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:453)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:102)
at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:100)
at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:126)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:817)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:817)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:428)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1341)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:431)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
19/03/21 00:04:08 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/pyspark.zip/pyspark/worker.py", line 433, in main
process()
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/pyspark.zip/pyspark/worker.py", line 428, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/pyspark.zip/pyspark/serializers.py", line 360, in dump_stream
self._assign_cols_by_name)
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/pyspark.zip/pyspark/serializers.py", line 316, in _create_batch
for i, field in enumerate(t)]
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/pyspark.zip/pyspark/serializers.py", line 287, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t)
File "pyarrow/array.pxi", line 335, in pyarrow.lib.Array.from_pandas (/Users/travis/build/BryanCutler/arrow-dist/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:30884)
return array(obj, mask=mask, type=type, memory_pool=memory_pool,
File "pyarrow/array.pxi", line 170, in pyarrow.lib.array (/Users/travis/build/BryanCutler/arrow-dist/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:29224)
return _ndarray_to_array(values, mask, type, from_pandas, pool)
File "pyarrow/array.pxi", line 70, in pyarrow.lib._ndarray_to_array (/Users/travis/build/BryanCutler/arrow-dist/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:28465)
check_status(NdarrayToArrow(pool, values, mask,
File "pyarrow/error.pxi", line 85, in pyarrow.lib.check_status (/Users/travis/build/BryanCutler/arrow-dist/arrow/python/build/temp.macosx-10.6-intel-2.7/lib.cxx:8570)
raise ArrowNotImplementedError(message)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, is it possible the mismatched types could generate wrong results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, something has to be done. I was at the very least tried to document the casting
combinations.
Pandas UDF matrix:
spark/python/pyspark/sql/functions.py
Lines 3098 to 3131 in b67d369
# The following table shows most of Pandas data and SQL type conversions in Pandas UDFs that | |
# are not yet visible to the user. Some of behaviors are buggy and might be changed in the near | |
# future. The table might have to be eventually documented externally. | |
# Please see SPARK-25798's PR to see the codes in order to generate the table below. | |
# | |
# +-----------------------------+----------------------+----------+-------+--------+--------------------+--------------------+--------+---------+---------+---------+------------+------------+------------+-----------------------------------+-----------------------------------------------------+-----------------+--------------------+-----------------------------+-------------+-----------------+------------------+-----------+--------------------------------+ # noqa | |
# |SQL Type \ Pandas Value(Type)|None(object(NoneType))|True(bool)|1(int8)|1(int16)| 1(int32)| 1(int64)|1(uint8)|1(uint16)|1(uint32)|1(uint64)|1.0(float16)|1.0(float32)|1.0(float64)|1970-01-01 00:00:00(datetime64[ns])|1970-01-01 00:00:00-05:00(datetime64[ns, US/Eastern])|a(object(string))| 1(object(Decimal))|[1 2 3](object(array[int32]))|1.0(float128)|(1+0j)(complex64)|(1+0j)(complex128)|A(category)|1 days 00:00:00(timedelta64[ns])| # noqa | |
# +-----------------------------+----------------------+----------+-------+--------+--------------------+--------------------+--------+---------+---------+---------+------------+------------+------------+-----------------------------------+-----------------------------------------------------+-----------------+--------------------+-----------------------------+-------------+-----------------+------------------+-----------+--------------------------------+ # noqa | |
# | boolean| None| True| True| True| True| True| True| True| True| True| False| False| False| False| False| X| X| X| False| False| False| X| False| # noqa | |
# | tinyint| None| 1| 1| 1| 1| 1| X| X| X| X| 1| 1| 1| X| X| X| X| X| X| X| X| 0| X| # noqa | |
# | smallint| None| 1| 1| 1| 1| 1| 1| X| X| X| 1| 1| 1| X| X| X| X| X| X| X| X| X| X| # noqa | |
# | int| None| 1| 1| 1| 1| 1| 1| 1| X| X| 1| 1| 1| X| X| X| X| X| X| X| X| X| X| # noqa | |
# | bigint| None| 1| 1| 1| 1| 1| 1| 1| 1| X| 1| 1| 1| 0| 18000000000000| X| X| X| X| X| X| X| X| # noqa | |
# | float| None| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| X| X| X|1.401298464324817...| X| X| X| X| X| X| # noqa | |
# | double| None| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| X| X| X| X| X| X| X| X| X| X| # noqa | |
# | date| None| X| X| X|datetime.date(197...| X| X| X| X| X| X| X| X| datetime.date(197...| X| X| X| X| X| X| X| X| X| # noqa | |
# | timestamp| None| X| X| X| X|datetime.datetime...| X| X| X| X| X| X| X| datetime.datetime...| datetime.datetime...| X| X| X| X| X| X| X| X| # noqa | |
# | string| None| u''|u'\x01'| u'\x01'| u'\x01'| u'\x01'| u'\x01'| u'\x01'| u'\x01'| u'\x01'| u''| u''| u''| X| X| u'a'| X| X| u''| u''| u''| X| X| # noqa | |
# | decimal(10,0)| None| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| Decimal('1')| X| X| X| X| X| X| # noqa | |
# | array<int>| None| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| [1, 2, 3]| X| X| X| X| X| # noqa | |
# | map<string,int>| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa | |
# | struct<_1:int>| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa | |
# | binary| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa | |
# +-----------------------------+----------------------+----------+-------+--------+--------------------+--------------------+--------+---------+---------+---------+------------+------------+------------+-----------------------------------+-----------------------------------------------------+-----------------+--------------------+-----------------------------+-------------+-----------------+------------------+-----------+--------------------------------+ # noqa | |
# | |
# Note: DDL formatted string is used for 'SQL Type' for simplicity. This string can be | |
# used in `returnType`. | |
# Note: The values inside of the table are generated by `repr`. | |
# Note: Python 2 is used to generate this table since it is used to check the backward | |
# compatibility often in practice. | |
# Note: Pandas 0.19.2 and PyArrow 0.9.0 are used. | |
# Note: Timezone is Singapore timezone. | |
# Note: 'X' means it throws an exception during the conversion. | |
# Note: 'binary' type is only supported with PyArrow 0.10.0+ (SPARK-23555). |
The problem is, this matrix is different from regular PySpark UDF, and also our TypeCoercions
:
Regular PySpark UDF matrix:
spark/python/pyspark/sql/functions.py
Lines 2830 to 2860 in b67d369
# The following table shows most of Python data and SQL type conversions in normal UDFs that | |
# are not yet visible to the user. Some of behaviors are buggy and might be changed in the near | |
# future. The table might have to be eventually documented externally. | |
# Please see SPARK-25666's PR to see the codes in order to generate the table below. | |
# | |
# +-----------------------------+--------------+----------+------+-------+---------------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+-----------------+------------+--------------+------------------+----------------------+ # noqa | |
# |SQL Type \ Python Value(Type)|None(NoneType)|True(bool)|1(int)|1(long)| a(str)| a(unicode)| 1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array('i', [1])(array)|[1](list)| (1,)(tuple)| ABC(bytearray)| 1(Decimal)|{'a': 1}(dict)|Row(kwargs=1)(Row)|Row(namedtuple=1)(Row)| # noqa | |
# +-----------------------------+--------------+----------+------+-------+---------------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+-----------------+------------+--------------+------------------+----------------------+ # noqa | |
# | boolean| None| True| None| None| None| None| None| None| None| None| None| None| None| None| None| X| X| # noqa | |
# | tinyint| None| None| 1| 1| None| None| None| None| None| None| None| None| None| None| None| X| X| # noqa | |
# | smallint| None| None| 1| 1| None| None| None| None| None| None| None| None| None| None| None| X| X| # noqa | |
# | int| None| None| 1| 1| None| None| None| None| None| None| None| None| None| None| None| X| X| # noqa | |
# | bigint| None| None| 1| 1| None| None| None| None| None| None| None| None| None| None| None| X| X| # noqa | |
# | string| None| u'true'| u'1'| u'1'| u'a'| u'a'|u'java.util.Grego...| u'java.util.Grego...| u'1.0'| u'[I@24a83055'| u'[1]'|u'[Ljava.lang.Obj...| u'[B@49093632'| u'1'| u'{a=1}'| X| X| # noqa | |
# | date| None| X| X| X| X| X|datetime.date(197...| datetime.date(197...| X| X| X| X| X| X| X| X| X| # noqa | |
# | timestamp| None| X| X| X| X| X| X| datetime.datetime...| X| X| X| X| X| X| X| X| X| # noqa | |
# | float| None| None| None| None| None| None| None| None| 1.0| None| None| None| None| None| None| X| X| # noqa | |
# | double| None| None| None| None| None| None| None| None| 1.0| None| None| None| None| None| None| X| X| # noqa | |
# | array<int>| None| None| None| None| None| None| None| None| None| [1]| [1]| [1]| [65, 66, 67]| None| None| X| X| # noqa | |
# | binary| None| None| None| None|bytearray(b'a')|bytearray(b'a')| None| None| None| None| None| None|bytearray(b'ABC')| None| None| X| X| # noqa | |
# | decimal(10,0)| None| None| None| None| None| None| None| None| None| None| None| None| None|Decimal('1')| None| X| X| # noqa | |
# | map<string,int>| None| None| None| None| None| None| None| None| None| None| None| None| None| None| {u'a': 1}| X| X| # noqa | |
# | struct<_1:int>| None| X| X| X| X| X| X| X| X| X|Row(_1=1)| Row(_1=1)| X| X| Row(_1=None)| Row(_1=1)| Row(_1=1)| # noqa | |
# +-----------------------------+--------------+----------+------+-------+---------------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+-----------------+------------+--------------+------------------+----------------------+ # noqa | |
# | |
# Note: DDL formatted string is used for 'SQL Type' for simplicity. This string can be | |
# used in `returnType`. | |
# Note: The values inside of the table are generated by `repr`. | |
# Note: Python 2 is used to generate this table since it is used to check the backward | |
# compatibility often in practice. | |
# Note: 'X' means it throws an exception during the conversion. |
I lost the last discussion about whether we should allow such type coercions or not. But basically my guts say: If we allow, I think it will need a huge bunch of codes to maintain again (Arrow Type <> Pandas type <> Python type <> SparkSQL type), but if we disallow, it will break many existing apps.
One way is that we explicitly document that Pandas's type coercion is dependent on Arrow (apart from regular PySpark UDF), and throw an explicit exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And .. yes I think we should also throw better exceptions at the very least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes type casting is necessary, for example if your integer column has null values then Pandas will automatically upcast to floating point to represent the nulls as NaNs. If that column is returned then it doesn't make sense to keep it as floating point because Spark can handle the null values, so using integer return type will cause type casting, but won't cause any problems.
arrs = [] | ||
for s, t in series: | ||
if t is not None and pa.types.is_struct(t): | ||
if not isinstance(s, pd.DataFrame): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good @BryanCutler, and cc @ueshin FYI. Just out of curiosity, WDYT about putting those PySpark specific conversion logics into somewhere together, of course, in a separate PR and JIRA? Looks it's getting difficult to read (to me .. )
…das UDF This change adds support for returning StructType from a scalar Pandas UDF, where the return value of the function is a pandas.DataFrame. Nested structs are not supported and an error will be raised, child types can be any other type currently supported. Added additional unit tests to `test_pandas_udf_scalar` Closes apache#23900 from BryanCutler/pyspark-support-scalar_udf-StructType-SPARK-23836. Authored-by: Bryan Cutler <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
…das UDF This change adds support for returning StructType from a scalar Pandas UDF, where the return value of the function is a pandas.DataFrame. Nested structs are not supported and an error will be raised, child types can be any other type currently supported. Added additional unit tests to `test_pandas_udf_scalar` Closes apache#23900 from BryanCutler/pyspark-support-scalar_udf-StructType-SPARK-23836. Authored-by: Bryan Cutler <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
What changes were proposed in this pull request?
This change adds support for returning StructType from a scalar Pandas UDF, where the return value of the function is a pandas.DataFrame. Nested structs are not supported and an error will be raised, child types can be any other type currently supported.
How was this patch tested?
Added additional unit tests to
test_pandas_udf_scalar