Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24624][SQL][PYTHON] Support mixture of Python UDF and Scalar Pandas UDF #21650

Closed
wants to merge 13 commits into from

Conversation

icexelloss
Copy link
Contributor

@icexelloss icexelloss commented Jun 27, 2018

What changes were proposed in this pull request?

This PR add supports for using mixed Python UDF and Scalar Pandas UDF, in the following two cases:

(1)

from pyspark.sql.functions import udf, pandas_udf

@udf('int')
def f1(x):
    return x + 1

@pandas_udf('int')
def f2(x):
    return x + 1

df = spark.range(0, 1).toDF('v') \
    .withColumn('foo', f1(col('v'))) \
    .withColumn('bar', f2(col('v')))

QueryPlan:

>>> df.explain(True)
== Parsed Logical Plan ==
'Project [v#2L, foo#5, f2('v) AS bar#9]
+- AnalysisBarrier
      +- Project [v#2L, f1(v#2L) AS foo#5]
         +- Project [id#0L AS v#2L]
            +- Range (0, 1, step=1, splits=Some(4))

== Analyzed Logical Plan ==
v: bigint, foo: int, bar: int
Project [v#2L, foo#5, f2(v#2L) AS bar#9]
+- Project [v#2L, f1(v#2L) AS foo#5]
   +- Project [id#0L AS v#2L]
      +- Range (0, 1, step=1, splits=Some(4))

== Optimized Logical Plan ==
Project [id#0L AS v#2L, f1(id#0L) AS foo#5, f2(id#0L) AS bar#9]
+- Range (0, 1, step=1, splits=Some(4))

== Physical Plan ==
*(2) Project [id#0L AS v#2L, pythonUDF0#13 AS foo#5, pythonUDF0#14 AS bar#9]
+- ArrowEvalPython [f2(id#0L)], [id#0L, pythonUDF0#13, pythonUDF0#14]
   +- BatchEvalPython [f1(id#0L)], [id#0L, pythonUDF0#13]
      +- *(1) Range (0, 1, step=1, splits=4)

(2)

from pyspark.sql.functions import udf, pandas_udf
@udf('int')
def f1(x):
    return x + 1

@pandas_udf('int')
def f2(x):
    return x + 1

df = spark.range(0, 1).toDF('v')
df = df.withColumn('foo', f2(f1(df['v'])))

QueryPlan:

>>> df.explain(True)
== Parsed Logical Plan ==
Project [v#21L, f2(f1(v#21L)) AS foo#46]
+- AnalysisBarrier
      +- Project [v#21L, f1(f2(v#21L)) AS foo#39]
         +- Project [v#21L, <lambda>(<lambda>(v#21L)) AS foo#32]
            +- Project [v#21L, <lambda>(<lambda>(v#21L)) AS foo#25]
               +- Project [id#19L AS v#21L]
                  +- Range (0, 1, step=1, splits=Some(4))

== Analyzed Logical Plan ==
v: bigint, foo: int
Project [v#21L, f2(f1(v#21L)) AS foo#46]
+- Project [v#21L, f1(f2(v#21L)) AS foo#39]
   +- Project [v#21L, <lambda>(<lambda>(v#21L)) AS foo#32]
      +- Project [v#21L, <lambda>(<lambda>(v#21L)) AS foo#25]
         +- Project [id#19L AS v#21L]
            +- Range (0, 1, step=1, splits=Some(4))

== Optimized Logical Plan ==
Project [id#19L AS v#21L, f2(f1(id#19L)) AS foo#46]
+- Range (0, 1, step=1, splits=Some(4))

== Physical Plan ==
*(2) Project [id#19L AS v#21L, pythonUDF0#50 AS foo#46]
+- ArrowEvalPython [f2(pythonUDF0#49)], [id#19L, pythonUDF0#49, pythonUDF0#50]
   +- BatchEvalPython [f1(id#19L)], [id#19L, pythonUDF0#49]
      +- *(1) Range (0, 1, step=1, splits=4)

How was this patch tested?

New tests are added to BatchEvalPythonExecSuite and ScalarPandasUDFTests

@icexelloss icexelloss force-pushed the SPARK-24624-mix-udf branch from 6b47b69 to be3b99c Compare June 27, 2018 22:43
@icexelloss
Copy link
Contributor Author

This PR takes me a while to get to because I am not very familiar with Catalyst rules. I think in the end the change is relative simple but I would appreciate some more careful review from people that are familiar with Catalyst.

cc @BryanCutler @gatorsmile @HyukjinKwon @ueshin

}

def apply(plan: SparkPlan): SparkPlan = plan transformUp {
// AggregateInPandasExec and FlatMapGroupsInPandas can be evaluated directly in python worker
// Therefore we don't need to extract the UDFs
case plan: FlatMapGroupsInPandasExec => plan
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed because this rule will only extract Python UDF and Scalar Pandas UDF and ignore other types of UDFs

@viirya
Copy link
Member

viirya commented Jun 27, 2018

@icexelloss Can you also show the query plan of the examples in the PR description? Thanks.

@maropu
Copy link
Member

maropu commented Jun 27, 2018

nit: Also, can you put [SQL][PYTHON] in the title?

@SparkQA
Copy link

SparkQA commented Jun 28, 2018

Test build #92401 has finished for PR 21650 at commit be3b99c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 28, 2018

Test build #92400 has finished for PR 21650 at commit 6b47b69.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@icexelloss icexelloss changed the title [SPARK-24624] Support mixture of Python UDF and Scalar Pandas UDF [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF and Scalar Pandas UDF Jun 28, 2018
@icexelloss
Copy link
Contributor Author

@viirya I have added the query plan output. @maropu I updated the PR title.

Thanks!

@BryanCutler
Copy link
Member

BryanCutler commented Jun 28, 2018

Would you mind changing cast (1) in your description? It threw me off a little as they looked independent at first glance. Maybe something like:

df = spark.range(0, 1).toDF('v') \
    .withColumn('foo', f1(df['v'])) \
    .withColumn('bar', f2(df['v']))

Also, are there any cases you know of that still aren't allowed?

df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011)
df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101)
df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110)
df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so df3 is the expected values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. I can add a comment to make it clearer.

* Collect evaluable UDFs from the current node.
*
* This function collects Python UDFs or Scalar Python UDFs from expressions of the input node,
* and returns a list of UDFs of the same eval type.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the user tries to mix a non-scalar UDF?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. It currently will throw an exception in the codegen stage. (Because non-scalar UDF will not be extracted by this rule)

We should probably throw a better exception but I need to think a bit how to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this on master and got the same exception:

>>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
>>> df.select(foo(df['v'])).show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/icexelloss/workspace/upstream/spark/python/pyspark/sql/dataframe.py", line 353, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/Users/icexelloss/workspace/upstream/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/Users/icexelloss/workspace/upstream/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/Users/icexelloss/workspace/upstream/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o257.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: <lambda>(input[0, bigint, false])
	at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
	at org.apache.spark.sql.catalyst.expressions.PythonUDF.doGenCode(PythonUDF.scala:50)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
	at scala.Option.getOrElse(Option.scala:121)
        ...

Therefore, this PR doesn't change that behavior. Both master and this PR don't extract non-scalar UDF in the expression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's not a very informative exception but we can fix that later. I made https://issues.apache.org/jira/browse/SPARK-24735 to track.

case _ =>
throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs")
case (vectorizedUdfs, plainUdfs) =>
throw new AnalysisException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the exception type? Can you make a test that causes this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we shouldn't reach here. (Otherwise it's bug). Don't know what's the best exception type here though.

*
* If expressions contain both UDFs eval types, this function will only return Python UDFs.
*
* The caller should call this function multiple times until all evaluable UDFs are collected.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this will pipeline UDFs of the same eval type so that they can be processed together in the same call to python worker?

For example if we have pandas_udf, pandas_udf, udf, udf then both pandas_udfs will be sent together to the worker, then both udfs together - python runner gets executed twice.

On the other hand, if we have pandas_udf, udf, pandas_udf, udf then each one will have to be executed at a time, and python runner gets executed 4 times. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.

@SparkQA
Copy link

SparkQA commented Jun 29, 2018

Test build #92443 has finished for PR 21650 at commit 674e361.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

assert type(x) == int
return x + 1

def f2(x):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is neither @udf nor @pandas_udf, is it on purpose? If so, could you add a comment to explain why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the purpose is to test mixing udf, pandas_udf and sql expression. I will add comments to make it clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments in test

@@ -166,8 +190,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child)
case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty =>
BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child)
case _ =>
throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs")
case (vectorizedUdfs, plainUdfs) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case _ => should work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, let me revert.

@@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
}
assert(qualifiedPlanNodes.size == 1)
}

private def collectPythonExec(spark: SparkPlan): Seq[BatchEvalPythonExec] = spark.collect {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan would be better than spark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I meant to call it plan but apparently made a mistake :(

case b: BatchEvalPythonExec => b
}

private def collectPandasExec(spark: SparkPlan): Seq[ArrowEvalPythonExec] = spark.collect {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@SparkQA
Copy link

SparkQA commented Jun 29, 2018

Test build #92482 has finished for PR 21650 at commit ce5e7f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member

I had an idea of a slightly different approach.. Would it be possible to "promote" the regular udf to a pandas_udf? By this I mean wrap the function using apply() so that it takes pd.Series as inputs and returns another pd.Series. Then we can send the entire mix of udfs and pandas_udfs to the worker in one shot, instead of separate evaluations. Since the user is already are using pandas_udfs we know that the worker supports it and I think the performance would be much better. Is there any downside or issues with doing it this way?

@icexelloss
Copy link
Contributor Author

icexelloss commented Jul 9, 2018

@BryanCutler I think your suggestion would change the behavior. ArrowEvalExec and BatchEvalExec are still different when it comes to corner cases, for example, type coercion (ArrowEvalExec supports type coercion but BatchEvalExec doesn't) and timestamp type (regular UDF expects Python datetime for timestamp and pandas UDF expects pd.Timestamp)

I think this is probably a good future improvement but not great for this Jira because of the behavior change. WDYT?

@BryanCutler
Copy link
Member

I think the previous behavior was to not allow mixing pandas and regular udfs, but you're probably right that there are some cases where data could be handled differently. I'll try to look at this more in depth today.

@gatorsmile
Copy link
Member

ping @BryanCutler Any update about this PR?

if (pythonUDFs.isEmpty) {
plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_SCALAR_PANDAS_UDF))
} else {
pythonUDFs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to loop through the expressions and find the first scalar python udf, either SQL_BATCHED_UDF or SQL_SCALAR_PANDAS_UDF and then collect the rest of that type. This is really what is happening here so I think it would be more straightforward to do this in a single loop instead of 2 flatMaps.

Copy link
Contributor Author

@icexelloss icexelloss Jul 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you said makes sense and that's actually my first attempt but end up being pretty complicated. The issue is that it is hard to do a one traversal of the expression tree to find the UDFs because we need to pass the evalType to all subtree and the result of one subtree can affect the result of another (i.e, if we find one type of UDF in one subtree, we need to pass the type to all other subtree because they must agree on evalType). Because the code is recursive in natural, this makes it pretty complicated to pass the correct eval Type in all places.

Another way is to do two traversals where in the first traversal, we look for eval type and in the second traversal, we look for UDFs of the eval type, but this isn't much different from what I have now in terms of efficiency and I find the current logic is simpler and less likely to have bugs. I actually tried these approaches and found the current way to be the easiest to implement and least likely to have bugs.

WDYT?

private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
case e => e.children.flatMap(collectEvaluatableUDF)
private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little confusing to have this function named so similar to the one below, maybe you can combine them if just doing a single loop (see other comment).

@@ -167,7 +191,8 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty =>
BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child)
case _ =>
throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs")
throw new AnalysisException(
"Mixed Python and Scalar Pandas UDFs are not expected here")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to "Expected either Scalar Pandas UDFs or Batched UDFs but got both"

@@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
}
assert(qualifiedPlanNodes.size == 1)
}

private def collectPythonExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to collectBatchExec

case b: BatchEvalPythonExec => b
}

private def collectPandasExec(plan: SparkPlan): Seq[ArrowEvalPythonExec] = plan.collect {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to collectArrowExec

import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.BooleanType

class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
import testImplicits.newProductEncoder
import testImplicits.localSeqToDatasetHolder

val pythonUDF = new MyDummyPythonUDF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pythonUDF -> pythonBatchedUDF

@@ -23,21 +23,27 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.api.python.{PythonEvalType, PythonFunction}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In}
import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.BooleanType

class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think your tests should be in this suite since it is just for BatchEvalPythonExec. How about ExtractPythonUDFsSuite?


df = self.spark.range(0, 10).toDF('v1')
df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1']))
df = df.withColumn('v3', pandas_udf(lambda x: x + 2, 'int')(df['v1']))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you just chain the withColumn calls here? I think it's clearer than reassigning the df each time

@@ -5471,6 +5598,22 @@ def foo(_):
self.assertEqual(r.a, 'hi')
self.assertEqual(r.b, 1)

def test_mixed_udf(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_mixed_udf -> test_mixed_scalar_udfs_followed_by_grouby_apply

df2 = df2.withColumn('f3_f1_f2', df['v'] + 111)
df2 = df2.withColumn('f3_f2_f1', df['v'] + 111)

self.assertEquals(df2.collect(), df1.collect())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to combine this test with the one above and construct it as a list of cases that you could loop over instead of so many blocks of withColumns. Something like

class TestCase():
    def __init__(self, col_name, col_expected, col_projection, col_udf_expression, col_sql_expression):
        ...

cases = [
    TestCase('f4_f3_f2_f1', df['v'] + 1111, f4(df1['f3_f2_f1']), f4(f3(f2(f1(df['v']))), f4(f3(f1(df['v']) + 10)))
    ...]

expected_df = df

for case in cases:
    expected_df = expected_df.with_column(case.col_name, case.col_expected)
    ....

self.assertEquals(expected_df.collect(), projection_df.collect())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, could you please elaborate a bit? e.g.

TestCase('f4_f3_f2_f1', df['v'] + 1111, f4(df1['f3_f2_f1']), f4(f3(f2(f1(df['v']))), f4(f3(f1(df['v']) + 10)))

How is df1['f3_f2_f1'] defined in this test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chained withColumn together instead of reassigning DataFrames. How does it look now?

@icexelloss icexelloss force-pushed the SPARK-24624-mix-udf branch from ce5e7f5 to 78f2ebf Compare July 23, 2018 16:07
@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #93451 has finished for PR 21650 at commit 4c9c007.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #93450 has finished for PR 21650 at commit 78f2ebf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@icexelloss
Copy link
Contributor Author

@BryanCutler I've address most of you comments and explained the ones that I didn't change. Do you mind take another look? Thanks!

.withColumn('f3', f3(col('v'))) \
.withColumn('f4', f4(col('v'))) \
.withColumn('f2_f1', f2(col('f1'))) \
.withColumn('f3_f1', f3(col('f1'))) \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks testing udf + udf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the way the test is written is that I am trying to test many combinations so some combinations might not be mixed UDF. Do you prefer that I remove these cases?

.withColumn('f1_f3', f1(f3(df['v']))) \
.withColumn('f2_f1', f2(f1(df['v']))) \
.withColumn('f2_f3', f2(f3(df['v']))) \
.withColumn('f3_f1', f3(f1(df['v']))) \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks combination between f1 and f3 duplicating few tests in test_mixed_udf, for instance f4_f3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the way the test is written is that I am trying to test many combinations so there are some dup cases. Do you prefer that I remove these?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea.. I know it's still minor since the elapsed time will be virtually the same but recently the build / test time was an issue, and I wonder if there's better way then avoding duplicated tests for now..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was discussed here #21845

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I don't think it's necessary (we are only likely to remove a few cases and like you said, the test time is virtually the same) and helps the readability of the tests (so it doesn't look like some test cases are missed).

But if that's the preferred practice I can remove duplicate cases in the next commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am okay to leave it too here since it's clear they are virtually the same but let's remove duplicated tests or orthogonal tests next time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. I will keep that in mind next time.

def test_mixed_scalar_udfs_followed_by_grouby_apply(self):
# Test Pandas UDF and scalar Python UDF followed by groupby apply
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
import pandas as pd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal at all really .. but I would swap the import order (thridparty, pyspark)

assert type(x) == int
return x + 1

def f2(x):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see why it looks confusing. Can we add an assert here too (check if it's a column)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

// Python UDF can't be evaluated directly in JVM
case children => !children.exists(hasPythonUDF)
private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
if (e.evalType != evalType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this function or write a comment since Scalar both Vectorized UDF and normal UDF can be evaluated in Python each but it returns false in this case?

@HyukjinKwon
Copy link
Member

I'm okay with #21650 (comment) way too but should be really simplified. Either way LGTM.

* type will be set to the eval type of the expression.
*
*/
private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BryanCutler I rewrite this function using mutable state based on your suggestion. It's not quite the same as your code so please take a look and let me know if this looks better now. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one method seems overly complicated, so I prefer the code from my suggestion.

Copy link
Contributor Author

@icexelloss icexelloss Jul 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your code:

  private def canEvaluateInPython(e: PythonUDF, firstEvalType: FirstEvalType): Boolean = {
    if (firstEvalType.isEvalTypeSet() && e.evalType != firstEvalType.evalType) {
      false
    } else {
      firstEvalType.evalType = e.evalType
      e.children match {
        // single PythonUDF child could be chained and evaluated in Python
        case Seq(u: PythonUDF) => canEvaluateInPython(u, firstEvalType)
        // Python UDF can't be evaluated directly in JVM
        case children => !children.exists(hasScalarPythonUDF)
      }
    }
  }

I think what's confusing part here is that the value of firstEvalType.evalType keeps changing while we are traversing the tree, and we could be carrying the value across independent subtrees (i.e., after finish traversing one subtree, the firstEvalType can be set to Scalar Pandas, even we didn't find a evaluable UDF and we never reset it so when we visit another subtree, we could get wrong results). The fact that the firstEvalType keeps changing as we traverse the tree seems very error prone to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow how this could get wrong results. firstEvalType.evalType = e.evalType is called only if the eval type is not set or if it is set and it equals the current eval type. In the latter case, it does assign the same value again, but that's fine. If there is some case that this fails, can you add that as a test?

Copy link
Contributor Author

@icexelloss icexelloss Jul 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bryan, I tried to apply your implementation and the simple test fails:

@udf('int')
def f1(x):
    assert type(x) == int
    return x + 1

@pandas_udf('int')
def f2(x):
    assert type(x) == pd.Series
    return x + 10

df = self.spark.range(0, 1).toDF('v')
df_chained_1 = df.withColumn('f2_f1', f2(f1(df['v'])))
expected_chained_1 = df.withColumn('f2_f1', df['v'] + 11)
self.assertEquals(expected_chained_1.collect(), df_chained_1.collect())

Do you mind trying this too? Hopefully I didn't do something silly here..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the above test part of sql/tests.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's in the most recent commit.

Copy link
Member

@BryanCutler BryanCutler Jul 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I see the problem. Since there was a map over plan.expressions, a new FirstEvalType object was being created for each expression. Changing this to the following corrected the failure:

val setEvalType = new FirstEvalType
val udfs = plan.expressions.flatMap(collectEvaluableUDFs(_, setEvalType))

I updated my above code to this, does that look correct now?

Copy link
Contributor Author

@icexelloss icexelloss Jul 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I applied you new code but the test I mentioned above still fails.

I think the issue could be when visiting f2(f1(col('v'))), firstEvalType is set to Scalar Pandas first and isn't set to Batched SQL later so f1 is not extracted. It's possible that my code is still different than yours somehow.

But similar to #21650 (comment), I think the state machine of the firstEvalType here is fairly complicated (i.e., what is the expected state of the eval type holder before and after canEvaluateInPythonand what's the invariants of the algo) with your suggested implementation and I found myself think pretty hard to prove the state machine is correct in all cases. If we want to go with this implementation, we need to carefully think about it and explain it in code...

The lazyEvalType implementation is better IMHO because the state machine is simpler - lazyEvalType is empty until we find the first evaluable UDF and the value doesn't change once it's set.

The first implementation (two pass, immutable state) is probably the simplest in terms of the mental complexity of the algo but is less efficient.

I think I am ok with both immutable state or the lazy state. I think @HyukjinKwon prefers the immutable state one. @BryanCutler WDYT?

@@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
*/
object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {

private def hasPythonUDF(e: Expression): Boolean = {
private case class LazyEvalType(var evalType: Int = -1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmmm looks messier then I thought .. previous one looks a bit better to me .. wdyt @BryanCutler ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too fond of the name LazyEvalType, makes it sound like something else. Maybe CurrentEvalType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the idea of the LazyEvalType is a container object that can be set once. Maybe the name LazyEvalType is confusing. I don't think CurrentEvalType is accurate either because the original idea is that we don't change the value once it's set. Maybe call it EvalTypeHolder and add docs to explain?

false
} else {
e.children match {
case Seq(u: PythonUDF) => canEvaluateInPython(u, lazyEvalType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 paths for recursion here, which is probably not a good idea. This method is much more complicated now and a little difficult to follow.

@SparkQA
Copy link

SparkQA commented Jul 25, 2018

Test build #93546 has finished for PR 21650 at commit 2bc906d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

ehh .. @BryanCutler, WDYT about just doing the previous one for now? The approach you suggested sounds efficient of course but.. here's not a hot path so I think the previous way is fine too .. since that's a bit cleaner (but a bit less efficient), and partly the code freeze is close.

@BryanCutler
Copy link
Member

ehh .. @BryanCutler, WDYT about just doing the previous one for now? The approach you suggested sounds efficient of course but.. here's not a hot path so I think the previous way is fine too .. since that's a bit cleaner (but a bit less efficient), and partly the code freeze is close

I didn't make the suggestion for performance, it was because looking at the previous code took me a while before I realized the intent was to find the first evaluable udf then all others matching that eval type. I think the previous code kind of masked that and made it more complicated to follow.

I wasn't really sure how the expression tree was evaluated, so my suggestion didn't handle chained expressions. The problem was the eval type was being set when checking the children nodes, instead it should only be set after all children are determined to be the same type. I'll update the above code again, which passes all tests, as far as I can tell. I still prefer this approach, but I'm not a sql expert ;)

@HyukjinKwon
Copy link
Member

Hm, then how about giving a try in a followup @BryanCutler if you see some values on it?

@icexelloss
Copy link
Contributor Author

@HyukjinKwon I think Bryan's imple looks promising. Please let me take a look.

@icexelloss
Copy link
Contributor Author

@BryanCutler @HyukjinKwon I updated the PR based on Bryan's suggestion. Please take a look and let me know if you have further comments.

Thanks!

@@ -94,36 +95,61 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
*/
object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {

private def hasPythonUDF(e: Expression): Boolean = {
private case class EvalTypeHolder(private var evalType: Int = -1) {
Copy link
Member

@HyukjinKwon HyukjinKwon Jul 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this:

  private type EvalType = Int
  private type EvalTypeChecker = EvalType => Boolean

  private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = {
    // Eval type checker is set in the middle of checking because once it's found,
    // the same eval type should be checked .. blah blah
    var evalChecker: Option[EvalTypeChecker] = None

    def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match {
      case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf)
        && evalChecker.isEmpty =>
        evalChecker = Some((otherEvalType: EvalType) => otherEvalType == udf.evalType)
        collectEvaluableUDFs(expr)
      case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf)
        && evalChecker.get(udf.evalType) =>
        Seq(udf)
      case e => e.children.flatMap(collectEvaluableUDFs)
    }

    expressions.flatMap(collectEvaluableUDFs)
  }

  def apply(plan: SparkPlan): SparkPlan = plan transformUp {
    case plan: SparkPlan => extract(plan)
  }

  /**
   * Extract all the PythonUDFs from the current operator and evaluate them before the operator.
   */
  private def extract(plan: SparkPlan): SparkPlan = {
    val udfs = collectEvaluableUDFsFromExpressions(plan.expressions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... You uses a var and nested function definition and var to remove the need of a holder object.

IMHO I usually find nested function definition and function that refers to variable outside its definition scope hard to read, but it could be my personal preference.

Another thing I like about the current impl the is EvalTypeHolder class ensures its value is ever changed once it's set so I think that's more robust.

That being said, I am ok with your suggestions too if you insist or @BryanCutler also prefers it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup. I do avoid nested functions but I found here is where is's needed. If it's clear when it's set and unset within a function, I think the shorter one is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will update the code then.

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93668 has finished for PR 21650 at commit b25936d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93667 has finished for PR 21650 at commit 6b22fea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf)
&& evalTypeChecker.isEmpty =>
evalTypeChecker = Some((otherEvalType: EvalType) => otherEvalType == udf.evalType)
Seq(udf)
Copy link
Contributor Author

@icexelloss icexelloss Jul 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon In your code this line is collectEvaluableUDFs(expr). I think we should just return Seq(udf) to avoid checking the expression twice.

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93688 has finished for PR 21650 at commit f3a45a5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@icexelloss
Copy link
Contributor Author

retest please

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93686 has finished for PR 21650 at commit 8e995e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

LGTM.

Merged to master.

@asfgit asfgit closed this in e875209 Jul 28, 2018
@icexelloss
Copy link
Contributor Author

Thanks @HyukjinKwon @BryanCutler for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants