-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24624][SQL][PYTHON] Support mixture of Python UDF and Scalar Pandas UDF #21650
Conversation
6b47b69
to
be3b99c
Compare
This PR takes me a while to get to because I am not very familiar with Catalyst rules. I think in the end the change is relative simple but I would appreciate some more careful review from people that are familiar with Catalyst. |
} | ||
|
||
def apply(plan: SparkPlan): SparkPlan = plan transformUp { | ||
// AggregateInPandasExec and FlatMapGroupsInPandas can be evaluated directly in python worker | ||
// Therefore we don't need to extract the UDFs | ||
case plan: FlatMapGroupsInPandasExec => plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer needed because this rule will only extract Python UDF and Scalar Pandas UDF and ignore other types of UDFs
@icexelloss Can you also show the query plan of the examples in the PR description? Thanks. |
nit: Also, can you put |
Test build #92401 has finished for PR 21650 at commit
|
Test build #92400 has finished for PR 21650 at commit
|
Would you mind changing cast (1) in your description? It threw me off a little as they looked independent at first glance. Maybe something like:
Also, are there any cases you know of that still aren't allowed? |
python/pyspark/sql/tests.py
Outdated
df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011) | ||
df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101) | ||
df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110) | ||
df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so df3 is the expected values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. I can add a comment to make it clearer.
* Collect evaluable UDFs from the current node. | ||
* | ||
* This function collects Python UDFs or Scalar Python UDFs from expressions of the input node, | ||
* and returns a list of UDFs of the same eval type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the user tries to mix a non-scalar UDF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. It currently will throw an exception in the codegen stage. (Because non-scalar UDF will not be extracted by this rule)
We should probably throw a better exception but I need to think a bit how to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this on master and got the same exception:
>>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
>>> df.select(foo(df['v'])).show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/icexelloss/workspace/upstream/spark/python/pyspark/sql/dataframe.py", line 353, in show
print(self._jdf.showString(n, 20, vertical))
File "/Users/icexelloss/workspace/upstream/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/Users/icexelloss/workspace/upstream/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/Users/icexelloss/workspace/upstream/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o257.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: <lambda>(input[0, bigint, false])
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
at org.apache.spark.sql.catalyst.expressions.PythonUDF.doGenCode(PythonUDF.scala:50)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at scala.Option.getOrElse(Option.scala:121)
...
Therefore, this PR doesn't change that behavior. Both master and this PR don't extract non-scalar UDF in the expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's not a very informative exception but we can fix that later. I made https://issues.apache.org/jira/browse/SPARK-24735 to track.
case _ => | ||
throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs") | ||
case (vectorizedUdfs, plainUdfs) => | ||
throw new AnalysisException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change the exception type? Can you make a test that causes this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because we shouldn't reach here. (Otherwise it's bug). Don't know what's the best exception type here though.
* | ||
* If expressions contain both UDFs eval types, this function will only return Python UDFs. | ||
* | ||
* The caller should call this function multiple times until all evaluable UDFs are collected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this will pipeline UDFs of the same eval type so that they can be processed together in the same call to python worker?
For example if we have pandas_udf, pandas_udf, udf, udf
then both pandas_udfs
will be sent together to the worker, then both udfs
together - python runner gets executed twice.
On the other hand, if we have pandas_udf, udf, pandas_udf, udf
then each one will have to be executed at a time, and python runner gets executed 4 times. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct.
Test build #92443 has finished for PR 21650 at commit
|
assert type(x) == int | ||
return x + 1 | ||
|
||
def f2(x): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this is neither @udf
nor @pandas_udf
, is it on purpose? If so, could you add a comment to explain why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the purpose is to test mixing udf, pandas_udf and sql expression. I will add comments to make it clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments in test
@@ -166,8 +190,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { | |||
ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child) | |||
case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty => | |||
BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child) | |||
case _ => | |||
throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs") | |||
case (vectorizedUdfs, plainUdfs) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case _ =>
should work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, let me revert.
@@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { | |||
} | |||
assert(qualifiedPlanNodes.size == 1) | |||
} | |||
|
|||
private def collectPythonExec(spark: SparkPlan): Seq[BatchEvalPythonExec] = spark.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plan
would be better than spark
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! I meant to call it plan
but apparently made a mistake :(
case b: BatchEvalPythonExec => b | ||
} | ||
|
||
private def collectPandasExec(spark: SparkPlan): Seq[ArrowEvalPythonExec] = spark.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
Test build #92482 has finished for PR 21650 at commit
|
I had an idea of a slightly different approach.. Would it be possible to "promote" the regular |
@BryanCutler I think your suggestion would change the behavior. ArrowEvalExec and BatchEvalExec are still different when it comes to corner cases, for example, type coercion (ArrowEvalExec supports type coercion but BatchEvalExec doesn't) and timestamp type (regular UDF expects Python datetime for timestamp and pandas UDF expects pd.Timestamp) I think this is probably a good future improvement but not great for this Jira because of the behavior change. WDYT? |
I think the previous behavior was to not allow mixing pandas and regular udfs, but you're probably right that there are some cases where data could be handled differently. I'll try to look at this more in depth today. |
ping @BryanCutler Any update about this PR? |
if (pythonUDFs.isEmpty) { | ||
plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_SCALAR_PANDAS_UDF)) | ||
} else { | ||
pythonUDFs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to loop through the expressions and find the first scalar python udf, either SQL_BATCHED_UDF
or SQL_SCALAR_PANDAS_UDF
and then collect the rest of that type. This is really what is happening here so I think it would be more straightforward to do this in a single loop instead of 2 flatMaps
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you said makes sense and that's actually my first attempt but end up being pretty complicated. The issue is that it is hard to do a one traversal of the expression tree to find the UDFs because we need to pass the evalType to all subtree and the result of one subtree can affect the result of another (i.e, if we find one type of UDF in one subtree, we need to pass the type to all other subtree because they must agree on evalType). Because the code is recursive in natural, this makes it pretty complicated to pass the correct eval Type in all places.
Another way is to do two traversals where in the first traversal, we look for eval type and in the second traversal, we look for UDFs of the eval type, but this isn't much different from what I have now in terms of efficiency and I find the current logic is simpler and less likely to have bugs. I actually tried these approaches and found the current way to be the easiest to implement and least likely to have bugs.
WDYT?
private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match { | ||
case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf) | ||
case e => e.children.flatMap(collectEvaluatableUDF) | ||
private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little confusing to have this function named so similar to the one below, maybe you can combine them if just doing a single loop (see other comment).
@@ -167,7 +191,8 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { | |||
case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty => | |||
BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child) | |||
case _ => | |||
throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs") | |||
throw new AnalysisException( | |||
"Mixed Python and Scalar Pandas UDFs are not expected here") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this to "Expected either Scalar Pandas UDFs or Batched UDFs but got both"
@@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { | |||
} | |||
assert(qualifiedPlanNodes.size == 1) | |||
} | |||
|
|||
private def collectPythonExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to collectBatchExec
case b: BatchEvalPythonExec => b | ||
} | ||
|
||
private def collectPandasExec(plan: SparkPlan): Seq[ArrowEvalPythonExec] = plan.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to collectArrowExec
import org.apache.spark.sql.test.SharedSQLContext | ||
import org.apache.spark.sql.types.BooleanType | ||
|
||
class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { | ||
import testImplicits.newProductEncoder | ||
import testImplicits.localSeqToDatasetHolder | ||
|
||
val pythonUDF = new MyDummyPythonUDF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pythonUDF
-> pythonBatchedUDF
@@ -23,21 +23,27 @@ import scala.collection.mutable.ArrayBuffer | |||
import org.apache.spark.api.python.{PythonEvalType, PythonFunction} | |||
import org.apache.spark.sql.catalyst.FunctionIdentifier | |||
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In} | |||
import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec} | |||
import org.apache.spark.sql.execution._ | |||
import org.apache.spark.sql.functions.col | |||
import org.apache.spark.sql.test.SharedSQLContext | |||
import org.apache.spark.sql.types.BooleanType | |||
|
|||
class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think your tests should be in this suite since it is just for BatchEvalPythonExec
. How about ExtractPythonUDFsSuite
?
python/pyspark/sql/tests.py
Outdated
|
||
df = self.spark.range(0, 10).toDF('v1') | ||
df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) | ||
df = df.withColumn('v3', pandas_udf(lambda x: x + 2, 'int')(df['v1'])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you just chain the withColumn
calls here? I think it's clearer than reassigning the df each time
python/pyspark/sql/tests.py
Outdated
@@ -5471,6 +5598,22 @@ def foo(_): | |||
self.assertEqual(r.a, 'hi') | |||
self.assertEqual(r.b, 1) | |||
|
|||
def test_mixed_udf(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_mixed_udf
-> test_mixed_scalar_udfs_followed_by_grouby_apply
python/pyspark/sql/tests.py
Outdated
df2 = df2.withColumn('f3_f1_f2', df['v'] + 111) | ||
df2 = df2.withColumn('f3_f2_f1', df['v'] + 111) | ||
|
||
self.assertEquals(df2.collect(), df1.collect()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to combine this test with the one above and construct it as a list of cases that you could loop over instead of so many blocks of withColumn
s. Something like
class TestCase():
def __init__(self, col_name, col_expected, col_projection, col_udf_expression, col_sql_expression):
...
cases = [
TestCase('f4_f3_f2_f1', df['v'] + 1111, f4(df1['f3_f2_f1']), f4(f3(f2(f1(df['v']))), f4(f3(f1(df['v']) + 10)))
...]
expected_df = df
for case in cases:
expected_df = expected_df.with_column(case.col_name, case.col_expected)
....
self.assertEquals(expected_df.collect(), projection_df.collect())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, could you please elaborate a bit? e.g.
TestCase('f4_f3_f2_f1', df['v'] + 1111, f4(df1['f3_f2_f1']), f4(f3(f2(f1(df['v']))), f4(f3(f1(df['v']) + 10)))
How is df1['f3_f2_f1']
defined in this test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chained withColumn
together instead of reassigning DataFrames. How does it look now?
ce5e7f5
to
78f2ebf
Compare
Test build #93451 has finished for PR 21650 at commit
|
Test build #93450 has finished for PR 21650 at commit
|
@BryanCutler I've address most of you comments and explained the ones that I didn't change. Do you mind take another look? Thanks! |
.withColumn('f3', f3(col('v'))) \ | ||
.withColumn('f4', f4(col('v'))) \ | ||
.withColumn('f2_f1', f2(col('f1'))) \ | ||
.withColumn('f3_f1', f3(col('f1'))) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks testing udf + udf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the way the test is written is that I am trying to test many combinations so some combinations might not be mixed UDF. Do you prefer that I remove these cases?
.withColumn('f1_f3', f1(f3(df['v']))) \ | ||
.withColumn('f2_f1', f2(f1(df['v']))) \ | ||
.withColumn('f2_f3', f2(f3(df['v']))) \ | ||
.withColumn('f3_f1', f3(f1(df['v']))) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks combination between f1 and f3 duplicating few tests in test_mixed_udf
, for instance f4_f3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the way the test is written is that I am trying to test many combinations so there are some dup cases. Do you prefer that I remove these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea.. I know it's still minor since the elapsed time will be virtually the same but recently the build / test time was an issue, and I wonder if there's better way then avoding duplicated tests for now..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was discussed here #21845
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I don't think it's necessary (we are only likely to remove a few cases and like you said, the test time is virtually the same) and helps the readability of the tests (so it doesn't look like some test cases are missed).
But if that's the preferred practice I can remove duplicate cases in the next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am okay to leave it too here since it's clear they are virtually the same but let's remove duplicated tests or orthogonal tests next time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. I will keep that in mind next time.
python/pyspark/sql/tests.py
Outdated
def test_mixed_scalar_udfs_followed_by_grouby_apply(self): | ||
# Test Pandas UDF and scalar Python UDF followed by groupby apply | ||
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType | ||
import pandas as pd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a big deal at all really .. but I would swap the import order (thridparty, pyspark)
assert type(x) == int | ||
return x + 1 | ||
|
||
def f2(x): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see why it looks confusing. Can we add an assert here too (check if it's a column)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
// Python UDF can't be evaluated directly in JVM | ||
case children => !children.exists(hasPythonUDF) | ||
private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = { | ||
if (e.evalType != evalType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this function or write a comment since Scalar both Vectorized UDF and normal UDF can be evaluated in Python each but it returns false
in this case?
I'm okay with #21650 (comment) way too but should be really simplified. Either way LGTM. |
* type will be set to the eval type of the expression. | ||
* | ||
*/ | ||
private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler I rewrite this function using mutable state based on your suggestion. It's not quite the same as your code so please take a look and let me know if this looks better now. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The one method seems overly complicated, so I prefer the code from my suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In your code:
private def canEvaluateInPython(e: PythonUDF, firstEvalType: FirstEvalType): Boolean = {
if (firstEvalType.isEvalTypeSet() && e.evalType != firstEvalType.evalType) {
false
} else {
firstEvalType.evalType = e.evalType
e.children match {
// single PythonUDF child could be chained and evaluated in Python
case Seq(u: PythonUDF) => canEvaluateInPython(u, firstEvalType)
// Python UDF can't be evaluated directly in JVM
case children => !children.exists(hasScalarPythonUDF)
}
}
}
I think what's confusing part here is that the value of firstEvalType.evalType
keeps changing while we are traversing the tree, and we could be carrying the value across independent subtrees (i.e., after finish traversing one subtree, the firstEvalType can be set to Scalar Pandas, even we didn't find a evaluable UDF and we never reset it so when we visit another subtree, we could get wrong results). The fact that the firstEvalType keeps changing as we traverse the tree seems very error prone to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow how this could get wrong results. firstEvalType.evalType = e.evalType
is called only if the eval type is not set or if it is set and it equals the current eval type. In the latter case, it does assign the same value again, but that's fine. If there is some case that this fails, can you add that as a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bryan, I tried to apply your implementation and the simple test fails:
@udf('int')
def f1(x):
assert type(x) == int
return x + 1
@pandas_udf('int')
def f2(x):
assert type(x) == pd.Series
return x + 10
df = self.spark.range(0, 1).toDF('v')
df_chained_1 = df.withColumn('f2_f1', f2(f1(df['v'])))
expected_chained_1 = df.withColumn('f2_f1', df['v'] + 11)
self.assertEquals(expected_chained_1.collect(), df_chained_1.collect())
Do you mind trying this too? Hopefully I didn't do something silly here..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the above test part of sql/tests.py?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's in the most recent commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think I see the problem. Since there was a map over plan.expressions
, a new FirstEvalType
object was being created for each expression. Changing this to the following corrected the failure:
val setEvalType = new FirstEvalType
val udfs = plan.expressions.flatMap(collectEvaluableUDFs(_, setEvalType))
I updated my above code to this, does that look correct now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I applied you new code but the test I mentioned above still fails.
I think the issue could be when visiting f2(f1(col('v')))
, firstEvalType is set to Scalar Pandas first and isn't set to Batched SQL later so f1 is not extracted. It's possible that my code is still different than yours somehow.
But similar to #21650 (comment), I think the state machine of the firstEvalType here is fairly complicated (i.e., what is the expected state of the eval type holder before and after canEvaluateInPython
and what's the invariants of the algo) with your suggested implementation and I found myself think pretty hard to prove the state machine is correct in all cases. If we want to go with this implementation, we need to carefully think about it and explain it in code...
The lazyEvalType implementation is better IMHO because the state machine is simpler - lazyEvalType is empty until we find the first evaluable UDF and the value doesn't change once it's set.
The first implementation (two pass, immutable state) is probably the simplest in terms of the mental complexity of the algo but is less efficient.
I think I am ok with both immutable state or the lazy state. I think @HyukjinKwon prefers the immutable state one. @BryanCutler WDYT?
@@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { | |||
*/ | |||
object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { | |||
|
|||
private def hasPythonUDF(e: Expression): Boolean = { | |||
private case class LazyEvalType(var evalType: Int = -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmmmm looks messier then I thought .. previous one looks a bit better to me .. wdyt @BryanCutler ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too fond of the name LazyEvalType
, makes it sound like something else. Maybe CurrentEvalType
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the idea of the LazyEvalType is a container object that can be set once. Maybe the name LazyEvalType is confusing. I don't think CurrentEvalType is accurate either because the original idea is that we don't change the value once it's set. Maybe call it EvalTypeHolder
and add docs to explain?
false | ||
} else { | ||
e.children match { | ||
case Seq(u: PythonUDF) => canEvaluateInPython(u, lazyEvalType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 paths for recursion here, which is probably not a good idea. This method is much more complicated now and a little difficult to follow.
Test build #93546 has finished for PR 21650 at commit
|
ehh .. @BryanCutler, WDYT about just doing the previous one for now? The approach you suggested sounds efficient of course but.. here's not a hot path so I think the previous way is fine too .. since that's a bit cleaner (but a bit less efficient), and partly the code freeze is close. |
I didn't make the suggestion for performance, it was because looking at the previous code took me a while before I realized the intent was to find the first evaluable udf then all others matching that eval type. I think the previous code kind of masked that and made it more complicated to follow. I wasn't really sure how the expression tree was evaluated, so my suggestion didn't handle chained expressions. The problem was the eval type was being set when checking the children nodes, instead it should only be set after all children are determined to be the same type. I'll update the above code again, which passes all tests, as far as I can tell. I still prefer this approach, but I'm not a sql expert ;) |
Hm, then how about giving a try in a followup @BryanCutler if you see some values on it? |
@HyukjinKwon I think Bryan's imple looks promising. Please let me take a look. |
@BryanCutler @HyukjinKwon I updated the PR based on Bryan's suggestion. Please take a look and let me know if you have further comments. Thanks! |
@@ -94,36 +95,61 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { | |||
*/ | |||
object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { | |||
|
|||
private def hasPythonUDF(e: Expression): Boolean = { | |||
private case class EvalTypeHolder(private var evalType: Int = -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this:
private type EvalType = Int
private type EvalTypeChecker = EvalType => Boolean
private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = {
// Eval type checker is set in the middle of checking because once it's found,
// the same eval type should be checked .. blah blah
var evalChecker: Option[EvalTypeChecker] = None
def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match {
case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf)
&& evalChecker.isEmpty =>
evalChecker = Some((otherEvalType: EvalType) => otherEvalType == udf.evalType)
collectEvaluableUDFs(expr)
case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf)
&& evalChecker.get(udf.evalType) =>
Seq(udf)
case e => e.children.flatMap(collectEvaluableUDFs)
}
expressions.flatMap(collectEvaluableUDFs)
}
def apply(plan: SparkPlan): SparkPlan = plan transformUp {
case plan: SparkPlan => extract(plan)
}
/**
* Extract all the PythonUDFs from the current operator and evaluate them before the operator.
*/
private def extract(plan: SparkPlan): SparkPlan = {
val udfs = collectEvaluableUDFsFromExpressions(plan.expressions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see... You uses a var and nested function definition and var to remove the need of a holder object.
IMHO I usually find nested function definition and function that refers to variable outside its definition scope hard to read, but it could be my personal preference.
Another thing I like about the current impl the is EvalTypeHolder
class ensures its value is ever changed once it's set so I think that's more robust.
That being said, I am ok with your suggestions too if you insist or @BryanCutler also prefers it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup. I do avoid nested functions but I found here is where is's needed. If it's clear when it's set and unset within a function, I think the shorter one is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will update the code then.
Test build #93668 has finished for PR 21650 at commit
|
Test build #93667 has finished for PR 21650 at commit
|
case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) | ||
&& evalTypeChecker.isEmpty => | ||
evalTypeChecker = Some((otherEvalType: EvalType) => otherEvalType == udf.evalType) | ||
Seq(udf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon In your code this line is collectEvaluableUDFs(expr)
. I think we should just return Seq(udf)
to avoid checking the expression twice.
Test build #93688 has finished for PR 21650 at commit
|
retest please |
Test build #93686 has finished for PR 21650 at commit
|
LGTM. Merged to master. |
Thanks @HyukjinKwon @BryanCutler for the review! |
What changes were proposed in this pull request?
This PR add supports for using mixed Python UDF and Scalar Pandas UDF, in the following two cases:
(1)
QueryPlan:
(2)
QueryPlan:
How was this patch tested?
New tests are added to BatchEvalPythonExecSuite and ScalarPandasUDFTests