Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14215] [SQL] [PYSPARK] Support chained Python UDFs #12014

Closed
wants to merge 4 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Mar 28, 2016

What changes were proposed in this pull request?

This PR brings the support for chained Python UDFs, for example

select udf1(udf2(a))
select udf1(udf2(a) + 3)
select udf1(udf2(a) + udf3(b)) 

Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches.

For example,

>>> sqlContext.sql("select double(double(1))").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [pythonUDF#10 AS double(double(1))#9]
:     +- INPUT
+- !BatchPythonEvaluation double(double(1)), [pythonUDF#10]
   +- Scan OneRowRelation[]
>>> sqlContext.sql("select double(double(1) + double(2))").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [pythonUDF#19 AS double((double(1) + double(2)))#16]
:     +- INPUT
+- !BatchPythonEvaluation double((pythonUDF#17 + pythonUDF#18)), [pythonUDF#17,pythonUDF#18,pythonUDF#19]
   +- !BatchPythonEvaluation double(2), [pythonUDF#17,pythonUDF#18]
      +- !BatchPythonEvaluation double(1), [pythonUDF#17]
         +- Scan OneRowRelation[]

TODO: will support multiple unrelated Python UDFs in one batch (another PR).

How was this patch tested?

Added new unit tests for chained UDFs.

@davies
Copy link
Contributor Author

davies commented Mar 28, 2016

cc @marmbrus @rxin

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54364 has finished for PR 12014 at commit 024a822.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Mar 28, 2016

cc @cloud-fan

@hvanhovell
Copy link
Contributor

@davies I think the JIRA number should be SPARK-14215: https://issues.apache.org/jira/browse/SPARK-14215

@davies davies changed the title [SPARK-14125] [SQL] [PYSPARK] Support chained Python UDFs [SPARK-14215] [SQL] [PYSPARK] Support chained Python UDFs Mar 28, 2016
@davies
Copy link
Contributor Author

davies commented Mar 28, 2016

@hvanhovell Corrected, thanks!

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54365 has finished for PR 12014 at commit b741073.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1648,6 +1648,14 @@ def sort_array(col, asc=True):

# ---------------------------- User Defined Function ----------------------------------

def _wrap_function(sc, func, returnType):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the point of creating a new _wrap_function here? To decrease the size of serialized python function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, we wanna chain the functions at python worker side.

@SparkQA
Copy link

SparkQA commented Mar 29, 2016

Test build #2705 has finished for PR 12014 at commit b741073.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

Overall LGTM

row_based = read_int(infile)
num_commands = read_int(infile)
if row_based:
profiler = None # profiling is not supported for UDF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

profiler seems need to be defined before this if block. The codes refer profiler later out of this block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other branch also have profiler, so I think it's fine.

@SparkQA
Copy link

SparkQA commented Mar 29, 2016

Test build #2707 has finished for PR 12014 at commit b741073.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case udf: PythonUDF if canEvaluate(udf) => udf
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
// Skip EvaluatePython nodes.
case plan: EvaluatePython => plan

case plan: LogicalPlan if plan.resolved =>
// Extract any PythonUDFs from the current operator.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update the comments the explain our new strategy of extracting and evaluating python udfs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@davies
Copy link
Contributor Author

davies commented Mar 29, 2016

Merging this into master (the last commit only added comments).

@asfgit asfgit closed this in a7a93a1 Mar 29, 2016
@SparkQA
Copy link

SparkQA commented Mar 29, 2016

Test build #54467 has finished for PR 12014 at commit c57e8a4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

@davies It sounds you used a wrong JIRA number.

@davies
Copy link
Contributor Author

davies commented Mar 29, 2016

@gatorsmile Corrected in PR, but the notification email is not updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants