-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27240][PYTHON] Use pandas DataFrame for struct type argument in Scalar Pandas UDF. #24177
[SPARK-27240][PYTHON] Use pandas DataFrame for struct type argument in Scalar Pandas UDF. #24177
Conversation
python/pyspark/sql/types.py
Outdated
# TODO: remove version check once minimum pyarrow version is 0.11.0 | ||
if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): | ||
raise TypeError("Unsupported type in conversion from Arrow: " + str(at) + | ||
"\nPlease install pyarrow >= 0.11.0 for StructType support.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently only support for pyarrow >=0.11 since I couldn't find a way to reconstruct pandas DataFrame from pyarrow.lib.StructArray
.
python/pyspark/serializers.py
Outdated
if self._df_for_struct and type(data_type) == StructType: | ||
import pandas as pd | ||
import pyarrow as pa | ||
column_arrays = zip(*[[chunk.field(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pyarrow.lib.StructArray.field()
is only available in pyarrow >=0.11.
Test build #103811 has finished for PR 24177 at commit
|
(seems forgot to file a JIRA) |
@HyukjinKwon Thanks! |
@@ -253,7 +253,9 @@ def read_udfs(pickleSer, infile, eval_type): | |||
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true")\ | |||
.lower() == "true" | |||
|
|||
ser = ArrowStreamPandasUDFSerializer(timezone, safecheck, assign_cols_by_name) | |||
df_for_struct = eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems hard to tell why when eval_type
is PythonEvalType.SQL_SCALAR_PANDAS_UDF
, then df_for_struct
should be true. Maybe a well explained comment here is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will add a comment.
for arrays, field in zip(column_arrays, data_type)] | ||
s = _check_dataframe_localize_timestamps(pd.concat(series, axis=1), self._timezone) | ||
else: | ||
s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column, data_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this create a new serializer each time calling arrow_to_pandas
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is just calling super class's method.
Test build #103820 has finished for PR 24177 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @ueshin ! If I understand correctly, this change means that any non-nested StructType
column from Spark will be converted to Pandas DataFrame for input to a pandas_udf? So if a pandas_udf had 2 arguments with one being a LongType
and one being a StructType
, then the user would see one Pandas Series and one Pandas DataFrame as the function input?
That behavior sounds reasonable to me, but I think it is a little different for grouped map udfs that merge all columns into a single Pandas DataFrame, and then I'm not sure how this would handle a StructType
column. I'm just wondering if this difference might end up confusing to the user, WDYT?
python/pyspark/serializers.py
Outdated
import pyarrow as pa | ||
column_arrays = zip(*[[chunk.field(i) | ||
for i in range(chunk.type.num_children)] | ||
for chunk in arrow_column.data.iterchunks()]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be best to avoid dealing with array chunks and keep this high level if possible. Would it be possible to build the Pandas DataFrame by flattening the Arrow column, building a table from those and then converting that to pandas? Something like this I think:
pdf = pa.Table.from_arrays(arrow_column.flatten()).to_pandas()
I'm not sure if the column names in the pdf would end up as expected though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow_column.flatten()
is great! Then we can support pyarrow>=0.10.
from pyspark.sql.types import StructType, \ | ||
_arrow_column_to_pandas, _check_dataframe_localize_timestamps | ||
|
||
if self._df_for_struct and type(data_type) == StructType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to check for a nested struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. We can't construct pandas DataFrame with a nested DataFrame.
I might miss what you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if data_type
has a nested struct, then is an error raised before it gets here? That could be addressed as a followup, I'm not sure if there is a test for it, but I'll check.
Yes, you were virtually referring |
@BryanCutler I'm sorry, but I couldn't figure out what you meant. |
Test build #103892 has finished for PR 24177 at commit
|
Sorry, I think I wrote that a little too hastily and it might not have made much sense. Yes, I was referring to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
merged to master, thanks @ueshin ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A late LGTM as well :D
…n Scalar Pandas UDF. Now that we support returning pandas DataFrame for struct type in Scalar Pandas UDF. If we chain another Pandas UDF after the Scalar Pandas UDF returning pandas DataFrame, the argument of the chained UDF will be pandas DataFrame, but currently we don't support pandas DataFrame as an argument of Scalar Pandas UDF. That means there is an inconsistency between the chained UDF and the single UDF. We should support taking pandas DataFrame for struct type argument in Scalar Pandas UDF to be consistent. Currently pyarrow >=0.11 is supported. Modified and added some tests. Closes apache#24177 from ueshin/issues/SPARK-27240/structtype_argument. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
…n Scalar Pandas UDF. ## What changes were proposed in this pull request? Now that we support returning pandas DataFrame for struct type in Scalar Pandas UDF. If we chain another Pandas UDF after the Scalar Pandas UDF returning pandas DataFrame, the argument of the chained UDF will be pandas DataFrame, but currently we don't support pandas DataFrame as an argument of Scalar Pandas UDF. That means there is an inconsistency between the chained UDF and the single UDF. We should support taking pandas DataFrame for struct type argument in Scalar Pandas UDF to be consistent. Currently pyarrow >=0.11 is supported. ## How was this patch tested? Modified and added some tests. Closes apache#24177 from ueshin/issues/SPARK-27240/structtype_argument. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
…n Scalar Pandas UDF. ## What changes were proposed in this pull request? Now that we support returning pandas DataFrame for struct type in Scalar Pandas UDF. If we chain another Pandas UDF after the Scalar Pandas UDF returning pandas DataFrame, the argument of the chained UDF will be pandas DataFrame, but currently we don't support pandas DataFrame as an argument of Scalar Pandas UDF. That means there is an inconsistency between the chained UDF and the single UDF. We should support taking pandas DataFrame for struct type argument in Scalar Pandas UDF to be consistent. Currently pyarrow >=0.11 is supported. ## How was this patch tested? Modified and added some tests. Closes apache#24177 from ueshin/issues/SPARK-27240/structtype_argument. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
What changes were proposed in this pull request?
Now that we support returning pandas DataFrame for struct type in Scalar Pandas UDF.
If we chain another Pandas UDF after the Scalar Pandas UDF returning pandas DataFrame, the argument of the chained UDF will be pandas DataFrame, but currently we don't support pandas DataFrame as an argument of Scalar Pandas UDF. That means there is an inconsistency between the chained UDF and the single UDF.
We should support taking pandas DataFrame for struct type argument in Scalar Pandas UDF to be consistent.
Currently pyarrow >=0.11 is supported.
How was this patch tested?
Modified and added some tests.