-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27463][PYTHON] Support Dataframe Cogroup via Pandas UDFs #24981
Conversation
…7463-poc # Conflicts: # core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala # python/pyspark/rdd.py # python/pyspark/worker.py
|
||
protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { | ||
while (inputIterator.hasNext) { | ||
dataOut.writeInt(SpecialLengths.START_ARROW_STREAM) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need this so that the python side knows whether it should expect another group. I'm using SpecialLengths.START_ARROW_STREAM to signal that there will be another group coming and SpecialLengths.END_OF_DATA_SECTION to indicate that we've finished sending all the arrow data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These SpecialLengths
have slightly different meanings elsewhere and I think it gets confusing trying to sort out how they are used. I think it would be clearer if we just say we are sending an integer before the data which will indicate how many groups to read, with a value of 0 to represent the end-of-stream. So we would send 2 before writing the left and right groups, and if we end up sending more groups in the future, then it wouldn't change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that makes sense. Changed
ok to test |
Test build #106961 has finished for PR 24981 at commit
|
Test build #106970 has finished for PR 24981 at commit
|
Test build #107125 has finished for PR 24981 at commit
|
…7463-poc-arrow-stream # Conflicts: # core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala # python/pyspark/rdd.py # python/pyspark/worker.py # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
Test build #107129 has finished for PR 24981 at commit
|
while (inputIterator.hasNext) { | ||
dataOut.writeInt(SpecialLengths.START_ARROW_STREAM) | ||
val (nextLeft, nextRight) = inputIterator.next() | ||
writeGroup(nextLeft, leftSchema, dataOut) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. In this implementation we are writing out the complete arrow stream for each group. I am ok with this one but is a little concerned about performance. I think I'd like to understand the performance diffs between the two POCs. Is it possible to do a microbenchmark of maybe 100M of data with very small to very large groups?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g. comparing the performance with:
- Each group has 1 row
- Each group has 10 row
- Each group has 100 row
... - The dataframe contains a single group.
@BryanCutler @HyukjinKwon WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, we should at least know the rough estimate about the performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's easy to run some numbers between the two POCs then that would be nice to see, but I think there would have to be a significant difference to break from the Arrow stream protocol. I would rather stick with this PR for now and leave performance improvements for followups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it should be too bad to test- I'll try and get one in the next few days. That said, I'll try and tidy up this code first as Bryan suggests.
@d80tb7, sorry there have been some changes in Pandas UDFs so it caused some conflicts, and design discussion about Pandas UDFs in general. But I think we can go ahead for cogroup separately for now. I am still positive about cogroup in general. Mind resolving conflicts and going ahead? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @d80tb7 ! Apologies for taking so long to review. I think it is going to be a bit of a moving target with some of the recent proposals about pandas_udfs types, but we should be able to get all the plumbing sorted out first. If the proposed changes get merged first, I don't think there will need to be much updating here, probably just changes in the pandas_udf declaration. WDYT?
Overall, this looks pretty good to me. I think we can clean up some things with the serializer in python, and I'll have to take a closer look at the Scala later.
python/pyspark/serializers.py
Outdated
@@ -401,6 +427,22 @@ def __repr__(self): | |||
return "ArrowStreamPandasUDFSerializer" | |||
|
|||
|
|||
class InterleavedArrowStreamPandasSerializer(ArrowStreamPandasUDFSerializer): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we call this PandasCogroupSerializer
or something like that where it is obvious what it is being used for?
...lyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseArrowPythonRunner.scala
Outdated
Show resolved
Hide resolved
|
||
|
||
protected def executePython[T] | ||
(data: Iterator[T], runner: BasePythonRunner[T, ColumnarBatch]): Iterator[InternalRow] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this file has some formatting issues also
|
||
protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { | ||
while (inputIterator.hasNext) { | ||
dataOut.writeInt(SpecialLengths.START_ARROW_STREAM) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These SpecialLengths
have slightly different meanings elsewhere and I think it gets confusing trying to sort out how they are used. I think it would be clearer if we just say we are sending an integer before the data which will indicate how many groups to read, with a value of 0 to represent the end-of-stream. So we would send 2 before writing the left and right groups, and if we end up sending more groups in the future, then it wouldn't change.
...core/src/main/scala/org/apache/spark/sql/execution/python/InterleavedArrowPythonRunner.scala
Outdated
Show resolved
Hide resolved
>>> df2 = spark.createDataFrame( | ||
... [(20000101, 1, "x"), (20000101, 2, "y")], | ||
... ("time", "id", "v2")) | ||
>>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should skip this test and run the doctests
-
add this to
dev/sparktestsupport/modules.py
atpyspark_sql
-
add:
def main(): doctest.testmod(...) ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are currently skipping all doctests for Pandas UDFs right? We could add the module but then need to skip each test individually, which might be more consistent with the rest of PySpark.
... ("time", "id", "v1")) | ||
>>> df2 = spark.createDataFrame( | ||
... [(20000101, 1, "x"), (20000101, 2, "y")], | ||
... ("time", "id", "v2")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation nit
|
||
""" | ||
Tests below use pd.DataFrame.assign that will infer mixed types (unicode/str) for column names | ||
from kwargs w/ Python 2, so need to set check_column_type=False and avoid this check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should leave this as a comment, not a string. Since it's not the top of the module currently, it's not docstring either.
from kwargs w/ Python 2, so need to set check_column_type=False and avoid this check | ||
""" | ||
if sys.version < '3': | ||
_check_column_type = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
_check_column_type = sys.version >= '3'
>>> df2 = spark.createDataFrame( | ||
... [(20000101, 1, "x"), (20000101, 2, "y")], | ||
... ("time", "id", "v2")) | ||
>>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we document when arguments are three? (when it includes the grouping key)
While it looks good in general, let me leave some comments per my review just for recording purpose. #24981 (comment) this comment at least seems critical - let me make a quick followup. If tests cannot pass, technically we might have to revert this PR. For clarification, I have no objection for the fact it was merged. |
Let me review further tomorrow in KST. |
### What changes were proposed in this pull request? This is a followup for #24981 Seems we mistakenly didn't added `test_pandas_udf_cogrouped_map` into `modules.py`. So we don't have official test results against that PR. ``` ... Starting test(python3.6): pyspark.sql.tests.test_pandas_udf ... Starting test(python3.6): pyspark.sql.tests.test_pandas_udf_grouped_agg ... Starting test(python3.6): pyspark.sql.tests.test_pandas_udf_grouped_map ... Starting test(python3.6): pyspark.sql.tests.test_pandas_udf_scalar ... Starting test(python3.6): pyspark.sql.tests.test_pandas_udf_window Finished test(python3.6): pyspark.sql.tests.test_pandas_udf (21s) ... Finished test(python3.6): pyspark.sql.tests.test_pandas_udf_grouped_map (49s) ... Finished test(python3.6): pyspark.sql.tests.test_pandas_udf_window (58s) ... Finished test(python3.6): pyspark.sql.tests.test_pandas_udf_scalar (82s) ... Finished test(python3.6): pyspark.sql.tests.test_pandas_udf_grouped_agg (105s) ... ``` If tests fail, we should revert that PR. ### Why are the changes needed? Relevant tests should be ran. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Jenkins tests. Closes #25890 from HyukjinKwon/SPARK-28840. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
@@ -0,0 +1,98 @@ | |||
# |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 adding it to pyspark/sql/__init__.py
with including it at __all__
since this is what group.py
does
/** | ||
* Common functionality for a udf runner that exchanges data with Python worker via Arrow stream. | ||
*/ | ||
abstract class BaseArrowPythonRunner[T]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we do such refactoring in this PR? it should better be separate; otherwise, it's hard to follow the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing we should consider is, R vectorized code path is matched with Python side. We should think about that before generalizing it - my goal was that deduplicating both code paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the changes were pretty straightforward to support the feature, mainly to be able to read back results the same way as group map udfs. I didn't consider this to be major refactoring, so making a complete copy of the python runner seemed a little excessive. Otherwise I would agree to keep things separate.
We should think about that before generalizing it - my goal was that deduplicating both code paths.
This sounds like a good idea, it shouldn't really matter if writing to Python or R, and would be good to deduplicate.
@@ -47,8 +47,8 @@ import org.apache.spark.sql.types.{NumericType, StructType} | |||
*/ | |||
@Stable | |||
class RelationalGroupedDataset protected[sql]( | |||
df: DataFrame, | |||
groupingExprs: Seq[Expression], | |||
val df: DataFrame, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is non-trivial since it exposes another API. It should better be private[spark]
to hide; otherwise, we will have to keep compatibility.
/** | ||
* Base functionality for plans which execute grouped python udfs. | ||
*/ | ||
abstract class BasePandasGroupExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, I think I am pretty against this refactoring. There are multiple duplicated codes in R vectorization too (which I added). I didn't intentionally yet refactor those. Plus, I don't think it's good idea to have both refactoring and feature implementation in one PR.
Thanks for the review @HyukjinKwon. I'm happy to prepare a PR with the changes you requested, please just let me know how to proceed. Specifically:
many thanks, Chris |
@d80tb7, can you address other comments except refactoring Let me take a closer look and see if we can avoid to introduce some hierarchy if you don't mind. BTW, I am now doing reserve force training 3 more days from now one (yes, in Korea we need to do it for some days as mandatory :D) so my progress might be a bit slow. |
Hi @HyukjinKwon I've raised #25939 to address the comments here (excluding those around the class hierarchy). Let me know what you think. Chris |
… cleanup of cogroup pandas UDF Follow up from #24981 incorporating some comments from HyukjinKwon. Specifically: - Adding `CoGroupedData` to `pyspark/sql/__init__.py __all__` so that documentation is generated. - Added pydoc, including example, for the use case whereby the user supplies a cogrouping function including a key. - Added the boilerplate for doctests to cogroup.py. Note that cogroup.py only contains the apply() function which has doctests disabled as per the other Pandas Udfs. - Restricted the newly exposed RelationalGroupedDataset constructor parameters to access only by the sql package. - Some minor formatting tweaks. This was tested by running the appropriate unit tests. I'm unsure as to how to check that my change will cause the documentation to be generated correctly, but it someone can describe how I can do this I'd be happy to check. Closes #25939 from d80tb7/SPARK-27463-fixes. Authored-by: Chris Martin <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
This PR adds some extra documentation for the new Cogrouped map Pandas udfs. Specifically: - Updated the usage guide for the new `COGROUPED_MAP` Pandas udfs added in #24981 - Updated the docstring for pandas_udf to include the COGROUPED_MAP type as suggested by HyukjinKwon in #25939 Closes #26110 from d80tb7/SPARK-29126-cogroup-udf-usage-guide. Authored-by: Chris Martin <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
What changes were proposed in this pull request?
Adds a new cogroup Pandas UDF. This allows two grouped dataframes to be cogrouped together and apply a (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame UDF to each cogroup.
Example usage
How was this patch tested?
Added unit test
test_pandas_udf_cogrouped_map