-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14215] [SQL] [PYSPARK] Support chained Python UDFs #12014
Conversation
Test build #54364 has finished for PR 12014 at commit
|
cc @cloud-fan |
@davies I think the JIRA number should be SPARK-14215: https://issues.apache.org/jira/browse/SPARK-14215 |
@hvanhovell Corrected, thanks! |
Test build #54365 has finished for PR 12014 at commit
|
@@ -1648,6 +1648,14 @@ def sort_array(col, asc=True): | |||
|
|||
# ---------------------------- User Defined Function ---------------------------------- | |||
|
|||
def _wrap_function(sc, func, returnType): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the point of creating a new _wrap_function
here? To decrease the size of serialized python function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh i see, we wanna chain the functions at python worker side.
Test build #2705 has finished for PR 12014 at commit
|
Overall LGTM |
row_based = read_int(infile) | ||
num_commands = read_int(infile) | ||
if row_based: | ||
profiler = None # profiling is not supported for UDF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
profiler
seems need to be defined before this if block. The codes refer profiler
later out of this block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other branch also have profiler
, so I think it's fine.
Test build #2707 has finished for PR 12014 at commit
|
case udf: PythonUDF if canEvaluate(udf) => udf | ||
} | ||
} | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
// Skip EvaluatePython nodes. | ||
case plan: EvaluatePython => plan | ||
|
||
case plan: LogicalPlan if plan.resolved => | ||
// Extract any PythonUDFs from the current operator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update the comments the explain our new strategy of extracting and evaluating python udfs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
Merging this into master (the last commit only added comments). |
Test build #54467 has finished for PR 12014 at commit
|
@davies It sounds you used a wrong JIRA number. |
@gatorsmile Corrected in PR, but the notification email is not updated. |
What changes were proposed in this pull request?
This PR brings the support for chained Python UDFs, for example
Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches.
For example,
TODO: will support multiple unrelated Python UDFs in one batch (another PR).
How was this patch tested?
Added new unit tests for chained UDFs.