From 3c2fe9a3886e15b80246438784f8d0b52570a011 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 22 Jun 2018 14:35:34 -0400 Subject: [PATCH 01/13] wip --- python/pyspark/sql/tests.py | 13 ++++++++++ .../execution/python/ExtractPythonUDFs.scala | 24 +++++++++++++++---- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 2d6b9f01e6525..0fe3535096130 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5060,6 +5060,19 @@ def test_type_annotation(self): df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id')) self.assertEqual(df.first()[0], 0) + def test_mixed_udf(self): + from pyspark.sql.functions import udf, pandas_udf + + df = self.spark.range(0, 10).toDF('a') + + df = df.withColumn('b', udf(lambda x: x + 1, 'double')(df['a'])) + df = df.withColumn('c', df['b'] + 2) + df = df.withColumn('d', udf(lambda x: x + 1, 'double')(df['c'])) + + df.explain(True) + + #df.show() + @unittest.skipIf( not _have_pandas or not _have_pyarrow, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 1e096100f7f43..79494982980fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -94,7 +94,7 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasPythonUDF(e: Expression): Boolean = { + private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined } @@ -103,7 +103,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { // single PythonUDF child could be chained and evaluated in Python case Seq(u: PythonUDF) => canEvaluateInPython(u) // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasPythonUDF) + case children => !children.exists(hasScalarPythonUDF) } } @@ -123,6 +123,10 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { * Extract all the PythonUDFs from the current operator and evaluate them before the operator. */ private def extract(plan: SparkPlan): SparkPlan = { + println("************* input ****************") + println(plan) + println("**************************************") + val udfs = plan.expressions.flatMap(collectEvaluatableUDF) // ignore the PythonUDF that come from second/third aggregate, which is not used .filter(udf => udf.references.subsetOf(plan.inputSet)) @@ -130,6 +134,8 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { // If there aren't any, we are done. plan } else { + println("udfs:" + udfs.toList) + val inputsForPlan = plan.references ++ plan.outputSet val prunedChildren = plan.children.map { child => val allNeededOutput = inputsForPlan.intersect(child.outputSet).toSeq @@ -166,7 +172,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child) case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty => BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child) - case _ => + case (vectorizedUdfs, plainUdfs) => throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs") } @@ -187,6 +193,10 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { attributeMap(p) } + println("*********** rewritten **************") + println(rewritten) + println("**************************************") + // extract remaining python UDFs recursively val newPlan = extract(rewritten) if (newPlan.output != plan.output) { @@ -195,6 +205,12 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { } else { newPlan } + + println("************ output ****************") + println(newPlan) + println("**************************************") + + newPlan } } @@ -205,7 +221,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { case filter: FilterExec => val (candidates, nonDeterministic) = splitConjunctivePredicates(filter.condition).partition(_.deterministic) - val (pushDown, rest) = candidates.partition(!hasPythonUDF(_)) + val (pushDown, rest) = candidates.partition(!hasScalarPythonUDF(_)) if (pushDown.nonEmpty) { val newChild = FilterExec(pushDown.reduceLeft(And), filter.child) FilterExec((rest ++ nonDeterministic).reduceLeft(And), newChild) From b3435b69a4d72ccfccad8f2c46b7d296833bf1f5 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Wed, 27 Jun 2018 18:31:25 -0400 Subject: [PATCH 02/13] Test passes --- python/pyspark/sql/tests.py | 165 +++++++++++++++--- .../execution/python/ExtractPythonUDFs.scala | 71 ++++---- .../python/BatchEvalPythonExecSuite.scala | 76 +++++++- 3 files changed, 259 insertions(+), 53 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 0fe3535096130..2e422005f1fe8 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -570,7 +570,8 @@ def test_udf_with_filter_function(self): my_filter = udf(lambda a: a < 2, BooleanType()) sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2")) - self.assertEqual(sel.collect(), [Row(key=1, value='1')]) + sel.explain(True) + # self.assertEqual(sel.collect(), [Row(key=1, value='1')]) def test_udf_with_aggregate_function(self): df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) @@ -4763,17 +4764,6 @@ def test_vectorized_udf_invalid_length(self): 'Result vector from pandas_udf was not the required length'): df.select(raise_exception(col('id'))).collect() - def test_vectorized_udf_mix_udf(self): - from pyspark.sql.functions import pandas_udf, udf, col - df = self.spark.range(10) - row_by_row_udf = udf(lambda x: x, LongType()) - pd_udf = pandas_udf(lambda x: x, LongType()) - with QuietTest(self.sc): - with self.assertRaisesRegexp( - Exception, - 'Can not mix vectorized and non-vectorized UDFs'): - df.select(row_by_row_udf(col('id')), pd_udf(col('id'))).collect() - def test_vectorized_udf_chained(self): from pyspark.sql.functions import pandas_udf, col df = self.spark.range(10) @@ -5061,17 +5051,136 @@ def test_type_annotation(self): self.assertEqual(df.first()[0], 0) def test_mixed_udf(self): + import pandas as pd from pyspark.sql.functions import udf, pandas_udf - df = self.spark.range(0, 10).toDF('a') - - df = df.withColumn('b', udf(lambda x: x + 1, 'double')(df['a'])) - df = df.withColumn('c', df['b'] + 2) - df = df.withColumn('d', udf(lambda x: x + 1, 'double')(df['c'])) - - df.explain(True) + df = self.spark.range(0, 1).toDF('v') + + @udf('int') + def f1(x): + assert type(x) == int + return x + 1 + + @pandas_udf('int') + def f2(x): + assert type(x) == pd.Series + return x + 10 + + @udf('int') + def f3(x): + assert type(x) == int + return x + 100 + + @pandas_udf('int') + def f4(x): + assert type(x) == pd.Series + return x + 1000 + + # Test mixed udfs in a single projection + df1 = df.withColumn('f1', f1(df['v'])) + df1 = df1.withColumn('f2', f2(df1['v'])) + df1 = df1.withColumn('f3', f3(df1['v'])) + df1 = df1.withColumn('f4', f4(df1['v'])) + df1 = df1.withColumn('f2_f1', f2(df1['f1'])) + df1 = df1.withColumn('f3_f1', f3(df1['f1'])) + df1 = df1.withColumn('f4_f1', f4(df1['f1'])) + df1 = df1.withColumn('f3_f2', f3(df1['f2'])) + df1 = df1.withColumn('f4_f2', f4(df1['f2'])) + df1 = df1.withColumn('f4_f3', f4(df1['f3'])) + df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1'])) + df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1'])) + df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1'])) + df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2'])) + df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1'])) + + # Test mixed udfs in a single expression + df2 = df.withColumn('f1', f1(df['v'])) + df2 = df2.withColumn('f2', f2(df['v'])) + df2 = df2.withColumn('f3', f3(df['v'])) + df2 = df2.withColumn('f4', f4(df['v'])) + df2 = df2.withColumn('f2_f1', f2(f1(df['v']))) + df2 = df2.withColumn('f3_f1', f3(f1(df['v']))) + df2 = df2.withColumn('f4_f1', f4(f1(df['v']))) + df2 = df2.withColumn('f3_f2', f3(f2(df['v']))) + df2 = df2.withColumn('f4_f2', f4(f2(df['v']))) + df2 = df2.withColumn('f4_f3', f4(f3(df['v']))) + df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) + df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v'])))) + df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v'])))) + df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v'])))) + df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v']))))) + + df3 = df.withColumn('f1', df['v'] + 1) + df3 = df3.withColumn('f2', df['v'] + 10) + df3 = df3.withColumn('f3', df['v'] + 100) + df3 = df3.withColumn('f4', df['v'] + 1000) + df3 = df3.withColumn('f2_f1', df['v'] + 11) + df3 = df3.withColumn('f3_f1', df['v'] + 101) + df3 = df3.withColumn('f4_f1', df['v'] + 1001) + df3 = df3.withColumn('f3_f2', df['v'] + 110) + df3 = df3.withColumn('f4_f2', df['v'] + 1010) + df3 = df3.withColumn('f4_f3', df['v'] + 1100) + df3 = df3.withColumn('f3_f2_f1', df['v'] + 111) + df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011) + df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101) + df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110) + df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111) + + self.assertTrue(df3.collect() == df1.collect()) + self.assertTrue(df3.collect() == df2.collect()) + + def test_mixed_udf_and_sql(self): + import pandas as pd + from pyspark.sql.functions import udf, pandas_udf - #df.show() + df = self.spark.range(0, 1).toDF('v') + + @udf('int') + def f1(x): + assert type(x) == int + return x + 1 + + def f2(x): + return x + 10 + + @pandas_udf('int') + def f3(x): + assert type(x) == pd.Series + return x + 100 + + df1 = df.withColumn('f1', f1(df['v'])) + df1 = df1.withColumn('f2', f2(df['v'])) + df1 = df1.withColumn('f3', f3(df['v'])) + df1 = df1.withColumn('f1_f2', f1(f2(df['v']))) + df1 = df1.withColumn('f1_f3', f1(f3(df['v']))) + df1 = df1.withColumn('f2_f1', f2(f1(df['v']))) + df1 = df1.withColumn('f2_f3', f2(f3(df['v']))) + df1 = df1.withColumn('f3_f1', f3(f1(df['v']))) + df1 = df1.withColumn('f3_f2', f3(f2(df['v']))) + df1 = df1.withColumn('f1_f2_f3', f1(f2(f3(df['v'])))) + df1 = df1.withColumn('f1_f3_f2', f1(f3(f2(df['v'])))) + df1 = df1.withColumn('f2_f1_f3', f2(f1(f3(df['v'])))) + df1 = df1.withColumn('f2_f3_f1', f2(f3(f1(df['v'])))) + df1 = df1.withColumn('f3_f1_f2', f3(f1(f2(df['v'])))) + df1 = df1.withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) + + df2 = df.withColumn('f1', df['v'] + 1) + df2 = df2.withColumn('f2', df['v'] + 10) + df2 = df2.withColumn('f3', df['v'] + 100) + df2 = df2.withColumn('f1_f2', df['v'] + 11) + df2 = df2.withColumn('f1_f3', df['v'] + 101) + df2 = df2.withColumn('f2_f1', df['v'] + 11) + df2 = df2.withColumn('f2_f3', df['v'] + 110) + df2 = df2.withColumn('f3_f1', df['v'] + 101) + df2 = df2.withColumn('f3_f2', df['v'] + 110) + df2 = df2.withColumn('f1_f2_f3', df['v'] + 111) + df2 = df2.withColumn('f1_f3_f2', df['v'] + 111) + df2 = df2.withColumn('f2_f1_f3', df['v'] + 111) + df2 = df2.withColumn('f2_f3_f1', df['v'] + 111) + df2 = df2.withColumn('f3_f1_f2', df['v'] + 111) + df2 = df2.withColumn('f3_f2_f1', df['v'] + 110) + + self.assertTrue(df2.collect(), df1.collect()) @unittest.skipIf( @@ -5500,6 +5609,22 @@ def dummy_pandas_udf(df): F.col('temp0.key') == F.col('temp1.key')) self.assertEquals(res.count(), 5) + def test_mixed_udf(self): + # Test Pandas UDF and scalar Python UDF followed by groupby apply + from pyspark.sql.functions import udf, pandas_udf, PandasUDFType + import pandas as pd + + df = self.spark.range(0, 10).toDF('v1') + df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) + df = df.withColumn('v3', pandas_udf(lambda x: x + 2, 'int')(df['v1'])) + + result = df.groupby() \ + .apply(pandas_udf(lambda x: pd.DataFrame([x.sum().sum()]), + 'sum int', + PandasUDFType.GROUPED_MAP)) + + self.assertEquals(result.collect()[0]['sum'], 165) + @unittest.skipIf( not _have_pandas or not _have_pyarrow, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 79494982980fa..8cef206f14df9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.api.python.PythonEvalType +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} @@ -98,24 +99,47 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { e.find(PythonUDF.isScalarPythonUDF).isDefined } - private def canEvaluateInPython(e: PythonUDF): Boolean = { - e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasScalarPythonUDF) + private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = { + if (e.evalType != evalType) { + false + } else { + e.children match { + // single PythonUDF child could be chained and evaluated in Python + case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType) + // Python UDF can't be evaluated directly in JVM + case children => !children.exists(hasScalarPythonUDF) + } } } - private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match { - case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf) - case e => e.children.flatMap(collectEvaluatableUDF) + private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match { + case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) => + Seq(udf) + case e => e.children.flatMap(collectEvaluableUDF(_, evalType)) + } + + /** + * Collect evaluable UDFs from the current node. + * + * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node, + * and returns a list of UDFs of the same eval type. + * + * If expressions contain both UDFs eval types, this function will only return Python UDFs. + * + * The caller should call this function multiple times until all evaluable UDFs are collected. + */ + private def collectEvaluableUDFs(plan: SparkPlan): Seq[PythonUDF] = { + val pythonUDFs = + plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_BATCHED_UDF)) + + if (pythonUDFs.isEmpty) { + plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_SCALAR_PANDAS_UDF)) + } else { + pythonUDFs + } } def apply(plan: SparkPlan): SparkPlan = plan transformUp { - // AggregateInPandasExec and FlatMapGroupsInPandas can be evaluated directly in python worker - // Therefore we don't need to extract the UDFs - case plan: FlatMapGroupsInPandasExec => plan case plan: SparkPlan => extract(plan) } @@ -123,19 +147,14 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { * Extract all the PythonUDFs from the current operator and evaluate them before the operator. */ private def extract(plan: SparkPlan): SparkPlan = { - println("************* input ****************") - println(plan) - println("**************************************") - - val udfs = plan.expressions.flatMap(collectEvaluatableUDF) + val udfs = collectEvaluableUDFs(plan) // ignore the PythonUDF that come from second/third aggregate, which is not used .filter(udf => udf.references.subsetOf(plan.inputSet)) + if (udfs.isEmpty) { // If there aren't any, we are done. plan } else { - println("udfs:" + udfs.toList) - val inputsForPlan = plan.references ++ plan.outputSet val prunedChildren = plan.children.map { child => val allNeededOutput = inputsForPlan.intersect(child.outputSet).toSeq @@ -173,7 +192,8 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty => BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child) case (vectorizedUdfs, plainUdfs) => - throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs") + throw new AnalysisException( + "Mixed Python and Scalar Pandas UDFs are not expected here") } attributeMap ++= validUdfs.zip(resultAttrs) @@ -193,24 +213,15 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { attributeMap(p) } - println("*********** rewritten **************") - println(rewritten) - println("**************************************") - // extract remaining python UDFs recursively val newPlan = extract(rewritten) + if (newPlan.output != plan.output) { // Trim away the new UDF value if it was only used for filtering or something. ProjectExec(plan.output, newPlan) } else { newPlan } - - println("************ output ****************") - println(newPlan) - println("**************************************") - - newPlan } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index d456c931f5275..b3befb505db06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.python import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer - import org.apache.spark.api.python.{PythonEvalType, PythonFunction} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In} -import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.BooleanType @@ -31,13 +31,18 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { import testImplicits.newProductEncoder import testImplicits.localSeqToDatasetHolder + val pythonUDF = new MyDummyPythonUDF + val pandasUDF = new MyDummyScalarPandasUDF + override def beforeAll(): Unit = { super.beforeAll() - spark.udf.registerPython("dummyPythonUDF", new MyDummyPythonUDF) + spark.udf.registerPython("dummyPythonUDF", pythonUDF) + spark.udf.registerPython("dummyScalarPandasUDF", pandasUDF) } override def afterAll(): Unit = { spark.sessionState.functionRegistry.dropFunction(FunctionIdentifier("dummyPythonUDF")) + spark.sessionState.functionRegistry.dropFunction(FunctionIdentifier("dummyScalarPandasUDF")) super.afterAll() } @@ -97,6 +102,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { } assert(qualifiedPlanNodes.size == 1) } + + private def collectPythonExec(spark: SparkPlan): Seq[BatchEvalPythonExec] = spark.collect { + case b: BatchEvalPythonExec => b + } + + private def collectPandasExec(spark: SparkPlan): Seq[ArrowEvalPythonExec] = spark.collect { + case b: ArrowEvalPythonExec => b + } + + test("Chained Python UDFs should be combined to a single physical node") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = df.withColumn("c", pythonUDF(col("a"))).withColumn("d", pythonUDF(col("c"))) + val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan) + assert(pythonEvalNodes.size == 1) + } + + test("Chained Pandas UDFs should be combined to a single physical node") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = df.withColumn("c", pandasUDF(col("a"))).withColumn("d", pandasUDF(col("c"))) + val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan) + assert(arrowEvalNodes.size == 1) + } + + test("Mixed Python UDFs and Pandas UDF should be separate physical node") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = df.withColumn("c", pythonUDF(col("a"))).withColumn("d", pandasUDF(col("b"))) + + val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan) + val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan) + assert(pythonEvalNodes.size == 1) + assert(arrowEvalNodes.size == 1) + } + + test("Independent Python UDFs and Pandas UDFs should be combined separately") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = df.withColumn("c1", pythonUDF(col("a"))) + .withColumn("c2", pythonUDF(col("c1"))) + .withColumn("d1", pandasUDF(col("a"))) + .withColumn("d2", pandasUDF(col("d1"))) + + val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan) + val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan) + assert(pythonEvalNodes.size == 1) + assert(arrowEvalNodes.size == 1) + } + + test("Dependent Python UDFs and Pandas UDFs should not be combined") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = df.withColumn("c1", pythonUDF(col("a"))) + .withColumn("d1", pandasUDF(col("c1"))) + .withColumn("c2", pythonUDF(col("d1"))) + .withColumn("d2", pandasUDF(col("c2"))) + + val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan) + val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan) + assert(pythonEvalNodes.size == 2) + assert(arrowEvalNodes.size == 2) + } } // This Python UDF is dummy and just for testing. Unable to execute. @@ -115,3 +178,10 @@ class MyDummyPythonUDF extends UserDefinedPythonFunction( dataType = BooleanType, pythonEvalType = PythonEvalType.SQL_BATCHED_UDF, udfDeterministic = true) + +class MyDummyScalarPandasUDF extends UserDefinedPythonFunction( + name = "dummyPandasUDF", + func = new DummyUDF, + dataType = BooleanType, + pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_UDF, + udfDeterministic = true) From 490dc0973da164c80a47061763f62956410be3a4 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Wed, 27 Jun 2018 18:34:28 -0400 Subject: [PATCH 03/13] Remove white spaces --- python/pyspark/sql/tests.py | 3 +-- .../apache/spark/sql/execution/python/ExtractPythonUDFs.scala | 2 -- .../spark/sql/execution/python/BatchEvalPythonExecSuite.scala | 1 + 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 2e422005f1fe8..fc31b9b4aef84 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -570,8 +570,7 @@ def test_udf_with_filter_function(self): my_filter = udf(lambda a: a < 2, BooleanType()) sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2")) - sel.explain(True) - # self.assertEqual(sel.collect(), [Row(key=1, value='1')]) + self.assertEqual(sel.collect(), [Row(key=1, value='1')]) def test_udf_with_aggregate_function(self): df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 8cef206f14df9..5870414755237 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -150,7 +150,6 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { val udfs = collectEvaluableUDFs(plan) // ignore the PythonUDF that come from second/third aggregate, which is not used .filter(udf => udf.references.subsetOf(plan.inputSet)) - if (udfs.isEmpty) { // If there aren't any, we are done. plan @@ -215,7 +214,6 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { // extract remaining python UDFs recursively val newPlan = extract(rewritten) - if (newPlan.output != plan.output) { // Trim away the new UDF value if it was only used for filtering or something. ProjectExec(plan.output, newPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index b3befb505db06..52e156a6803ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer + import org.apache.spark.api.python.{PythonEvalType, PythonFunction} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In} From 3015257c8bf28e3afdd6a9d316ae068dd02f9794 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Thu, 28 Jun 2018 20:31:39 -0400 Subject: [PATCH 04/13] Fix typo in test --- python/pyspark/sql/tests.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index fc31b9b4aef84..a1425d6b6f40a 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5125,8 +5125,8 @@ def f4(x): df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110) df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111) - self.assertTrue(df3.collect() == df1.collect()) - self.assertTrue(df3.collect() == df2.collect()) + self.assertEquals(df3.collect(), df1.collect()) + self.assertEquals(df3.collect(), df2.collect()) def test_mixed_udf_and_sql(self): import pandas as pd @@ -5177,9 +5177,9 @@ def f3(x): df2 = df2.withColumn('f2_f1_f3', df['v'] + 111) df2 = df2.withColumn('f2_f3_f1', df['v'] + 111) df2 = df2.withColumn('f3_f1_f2', df['v'] + 111) - df2 = df2.withColumn('f3_f2_f1', df['v'] + 110) + df2 = df2.withColumn('f3_f2_f1', df['v'] + 111) - self.assertTrue(df2.collect(), df1.collect()) + self.assertEquals(df2.collect(), df1.collect()) @unittest.skipIf( From cbf310e3f97082776f5cf832d79877a2997d3401 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 29 Jun 2018 11:01:34 -0400 Subject: [PATCH 05/13] Address PR comments --- python/pyspark/sql/tests.py | 6 ++++++ .../spark/sql/execution/python/ExtractPythonUDFs.scala | 2 +- .../sql/execution/python/BatchEvalPythonExecSuite.scala | 4 ++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a1425d6b6f40a..5b6b47eb595b9 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5055,6 +5055,8 @@ def test_mixed_udf(self): df = self.spark.range(0, 1).toDF('v') + # Test mixture of multiple UDFs and Pandas UDFs + @udf('int') def f1(x): assert type(x) == int @@ -5109,6 +5111,7 @@ def f4(x): df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v'])))) df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v']))))) + # expected result df3 = df.withColumn('f1', df['v'] + 1) df3 = df3.withColumn('f2', df['v'] + 10) df3 = df3.withColumn('f3', df['v'] + 100) @@ -5134,6 +5137,8 @@ def test_mixed_udf_and_sql(self): df = self.spark.range(0, 1).toDF('v') + # Test mixture of UDFs, Pandas UDFs and SQL expression. + @udf('int') def f1(x): assert type(x) == int @@ -5163,6 +5168,7 @@ def f3(x): df1 = df1.withColumn('f3_f1_f2', f3(f1(f2(df['v'])))) df1 = df1.withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) + # expected result df2 = df.withColumn('f1', df['v'] + 1) df2 = df2.withColumn('f2', df['v'] + 10) df2 = df2.withColumn('f3', df['v'] + 100) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 5870414755237..7fb569dfff3a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -190,7 +190,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child) case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty => BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child) - case (vectorizedUdfs, plainUdfs) => + case _ => throw new AnalysisException( "Mixed Python and Scalar Pandas UDFs are not expected here") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 52e156a6803ea..f976b8f747137 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -104,11 +104,11 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { assert(qualifiedPlanNodes.size == 1) } - private def collectPythonExec(spark: SparkPlan): Seq[BatchEvalPythonExec] = spark.collect { + private def collectPythonExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect { case b: BatchEvalPythonExec => b } - private def collectPandasExec(spark: SparkPlan): Seq[ArrowEvalPythonExec] = spark.collect { + private def collectPandasExec(plan: SparkPlan): Seq[ArrowEvalPythonExec] = plan.collect { case b: ArrowEvalPythonExec => b } From 78f2ebf3b11fe8849fe0d41300f74319ca174d42 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Mon, 23 Jul 2018 12:02:22 -0400 Subject: [PATCH 06/13] Address PR comments --- python/pyspark/sql/tests.py | 161 +++++++++--------- .../execution/python/ExtractPythonUDFs.scala | 2 +- .../python/BatchEvalPythonExecSuite.scala | 64 +------ .../python/ExtractPythonUDFsSuite.scala | 93 ++++++++++ 4 files changed, 177 insertions(+), 143 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 5b6b47eb595b9..b994c06f668c9 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5051,7 +5051,7 @@ def test_type_annotation(self): def test_mixed_udf(self): import pandas as pd - from pyspark.sql.functions import udf, pandas_udf + from pyspark.sql.functions import col, udf, pandas_udf df = self.spark.range(0, 1).toDF('v') @@ -5078,55 +5078,58 @@ def f4(x): return x + 1000 # Test mixed udfs in a single projection - df1 = df.withColumn('f1', f1(df['v'])) - df1 = df1.withColumn('f2', f2(df1['v'])) - df1 = df1.withColumn('f3', f3(df1['v'])) - df1 = df1.withColumn('f4', f4(df1['v'])) - df1 = df1.withColumn('f2_f1', f2(df1['f1'])) - df1 = df1.withColumn('f3_f1', f3(df1['f1'])) - df1 = df1.withColumn('f4_f1', f4(df1['f1'])) - df1 = df1.withColumn('f3_f2', f3(df1['f2'])) - df1 = df1.withColumn('f4_f2', f4(df1['f2'])) - df1 = df1.withColumn('f4_f3', f4(df1['f3'])) - df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1'])) - df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1'])) - df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1'])) - df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2'])) - df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1'])) + df1 = df \ + .withColumn('f1', f1(col('v'))) \ + .withColumn('f2', f2(col('v'))) \ + .withColumn('f3', f3(col('v'))) \ + .withColumn('f4', f4(col('v'))) \ + .withColumn('f2_f1', f2(col('f1'))) \ + .withColumn('f3_f1', f3(col('f1'))) \ + .withColumn('f4_f1', f4(col('f1'))) \ + .withColumn('f3_f2', f3(col('f2'))) \ + .withColumn('f4_f2', f4(col('f2'))) \ + .withColumn('f4_f3', f4(col('f3'))) \ + .withColumn('f3_f2_f1', f3(col('f2_f1'))) \ + .withColumn('f4_f2_f1', f4(col('f2_f1'))) \ + .withColumn('f4_f3_f1', f4(col('f3_f1'))) \ + .withColumn('f4_f3_f2', f4(col('f3_f2'))) \ + .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) # Test mixed udfs in a single expression - df2 = df.withColumn('f1', f1(df['v'])) - df2 = df2.withColumn('f2', f2(df['v'])) - df2 = df2.withColumn('f3', f3(df['v'])) - df2 = df2.withColumn('f4', f4(df['v'])) - df2 = df2.withColumn('f2_f1', f2(f1(df['v']))) - df2 = df2.withColumn('f3_f1', f3(f1(df['v']))) - df2 = df2.withColumn('f4_f1', f4(f1(df['v']))) - df2 = df2.withColumn('f3_f2', f3(f2(df['v']))) - df2 = df2.withColumn('f4_f2', f4(f2(df['v']))) - df2 = df2.withColumn('f4_f3', f4(f3(df['v']))) - df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) - df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v'])))) - df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v'])))) - df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v'])))) - df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v']))))) + df2 = df \ + .withColumn('f1', f1(col('v'))) \ + .withColumn('f2', f2(col('v'))) \ + .withColumn('f3', f3(col('v'))) \ + .withColumn('f4', f4(col('v'))) \ + .withColumn('f2_f1', f2(f1(col('v')))) \ + .withColumn('f3_f1', f3(f1(col('v')))) \ + .withColumn('f4_f1', f4(f1(col('v')))) \ + .withColumn('f3_f2', f3(f2(col('v')))) \ + .withColumn('f4_f2', f4(f2(col('v')))) \ + .withColumn('f4_f3', f4(f3(col('v')))) \ + .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \ + .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \ + .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \ + .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \ + .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')))))) # expected result - df3 = df.withColumn('f1', df['v'] + 1) - df3 = df3.withColumn('f2', df['v'] + 10) - df3 = df3.withColumn('f3', df['v'] + 100) - df3 = df3.withColumn('f4', df['v'] + 1000) - df3 = df3.withColumn('f2_f1', df['v'] + 11) - df3 = df3.withColumn('f3_f1', df['v'] + 101) - df3 = df3.withColumn('f4_f1', df['v'] + 1001) - df3 = df3.withColumn('f3_f2', df['v'] + 110) - df3 = df3.withColumn('f4_f2', df['v'] + 1010) - df3 = df3.withColumn('f4_f3', df['v'] + 1100) - df3 = df3.withColumn('f3_f2_f1', df['v'] + 111) - df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011) - df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101) - df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110) - df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111) + df3 = df \ + .withColumn('f1', df['v'] + 1) \ + .withColumn('f2', df['v'] + 10) \ + .withColumn('f3', df['v'] + 100) \ + .withColumn('f4', df['v'] + 1000) \ + .withColumn('f2_f1', df['v'] + 11) \ + .withColumn('f3_f1', df['v'] + 101) \ + .withColumn('f4_f1', df['v'] + 1001) \ + .withColumn('f3_f2', df['v'] + 110) \ + .withColumn('f4_f2', df['v'] + 1010) \ + .withColumn('f4_f3', df['v'] + 1100) \ + .withColumn('f3_f2_f1', df['v'] + 111) \ + .withColumn('f4_f2_f1', df['v'] + 1011) \ + .withColumn('f4_f3_f1', df['v'] + 1101) \ + .withColumn('f4_f3_f2', df['v'] + 1110) \ + .withColumn('f4_f3_f2_f1', df['v'] + 1111) self.assertEquals(df3.collect(), df1.collect()) self.assertEquals(df3.collect(), df2.collect()) @@ -5152,38 +5155,38 @@ def f3(x): assert type(x) == pd.Series return x + 100 - df1 = df.withColumn('f1', f1(df['v'])) - df1 = df1.withColumn('f2', f2(df['v'])) - df1 = df1.withColumn('f3', f3(df['v'])) - df1 = df1.withColumn('f1_f2', f1(f2(df['v']))) - df1 = df1.withColumn('f1_f3', f1(f3(df['v']))) - df1 = df1.withColumn('f2_f1', f2(f1(df['v']))) - df1 = df1.withColumn('f2_f3', f2(f3(df['v']))) - df1 = df1.withColumn('f3_f1', f3(f1(df['v']))) - df1 = df1.withColumn('f3_f2', f3(f2(df['v']))) - df1 = df1.withColumn('f1_f2_f3', f1(f2(f3(df['v'])))) - df1 = df1.withColumn('f1_f3_f2', f1(f3(f2(df['v'])))) - df1 = df1.withColumn('f2_f1_f3', f2(f1(f3(df['v'])))) - df1 = df1.withColumn('f2_f3_f1', f2(f3(f1(df['v'])))) - df1 = df1.withColumn('f3_f1_f2', f3(f1(f2(df['v'])))) - df1 = df1.withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) + df1 = df.withColumn('f1', f1(df['v'])) \ + .withColumn('f2', f2(df['v'])) \ + .withColumn('f3', f3(df['v'])) \ + .withColumn('f1_f2', f1(f2(df['v']))) \ + .withColumn('f1_f3', f1(f3(df['v']))) \ + .withColumn('f2_f1', f2(f1(df['v']))) \ + .withColumn('f2_f3', f2(f3(df['v']))) \ + .withColumn('f3_f1', f3(f1(df['v']))) \ + .withColumn('f3_f2', f3(f2(df['v']))) \ + .withColumn('f1_f2_f3', f1(f2(f3(df['v'])))) \ + .withColumn('f1_f3_f2', f1(f3(f2(df['v'])))) \ + .withColumn('f2_f1_f3', f2(f1(f3(df['v'])))) \ + .withColumn('f2_f3_f1', f2(f3(f1(df['v'])))) \ + .withColumn('f3_f1_f2', f3(f1(f2(df['v'])))) \ + .withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) # expected result - df2 = df.withColumn('f1', df['v'] + 1) - df2 = df2.withColumn('f2', df['v'] + 10) - df2 = df2.withColumn('f3', df['v'] + 100) - df2 = df2.withColumn('f1_f2', df['v'] + 11) - df2 = df2.withColumn('f1_f3', df['v'] + 101) - df2 = df2.withColumn('f2_f1', df['v'] + 11) - df2 = df2.withColumn('f2_f3', df['v'] + 110) - df2 = df2.withColumn('f3_f1', df['v'] + 101) - df2 = df2.withColumn('f3_f2', df['v'] + 110) - df2 = df2.withColumn('f1_f2_f3', df['v'] + 111) - df2 = df2.withColumn('f1_f3_f2', df['v'] + 111) - df2 = df2.withColumn('f2_f1_f3', df['v'] + 111) - df2 = df2.withColumn('f2_f3_f1', df['v'] + 111) - df2 = df2.withColumn('f3_f1_f2', df['v'] + 111) - df2 = df2.withColumn('f3_f2_f1', df['v'] + 111) + df2 = df.withColumn('f1', df['v'] + 1) \ + .withColumn('f2', df['v'] + 10) \ + .withColumn('f3', df['v'] + 100) \ + .withColumn('f1_f2', df['v'] + 11) \ + .withColumn('f1_f3', df['v'] + 101) \ + .withColumn('f2_f1', df['v'] + 11) \ + .withColumn('f2_f3', df['v'] + 110) \ + .withColumn('f3_f1', df['v'] + 101) \ + .withColumn('f3_f2', df['v'] + 110) \ + .withColumn('f1_f2_f3', df['v'] + 111) \ + .withColumn('f1_f3_f2', df['v'] + 111) \ + .withColumn('f2_f1_f3', df['v'] + 111) \ + .withColumn('f2_f3_f1', df['v'] + 111) \ + .withColumn('f3_f1_f2', df['v'] + 111) \ + .withColumn('f3_f2_f1', df['v'] + 111) self.assertEquals(df2.collect(), df1.collect()) @@ -5614,14 +5617,14 @@ def dummy_pandas_udf(df): F.col('temp0.key') == F.col('temp1.key')) self.assertEquals(res.count(), 5) - def test_mixed_udf(self): + def test_mixed_scalar_udfs_followed_by_grouby_apply(self): # Test Pandas UDF and scalar Python UDF followed by groupby apply from pyspark.sql.functions import udf, pandas_udf, PandasUDFType import pandas as pd df = self.spark.range(0, 10).toDF('v1') - df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) - df = df.withColumn('v3', pandas_udf(lambda x: x + 2, 'int')(df['v1'])) + df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) \ + .withColumn('v3', pandas_udf(lambda x: x + 2, 'int')(df['v1'])) result = df.groupby() \ .apply(pandas_udf(lambda x: pd.DataFrame([x.sum().sum()]), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 7fb569dfff3a0..90818a95ac766 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -192,7 +192,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child) case _ => throw new AnalysisException( - "Mixed Python and Scalar Pandas UDFs are not expected here") + "Expected either Scalar Pandas UDFs or Batched UDFs but got both") } attributeMap ++= validUdfs.zip(resultAttrs) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index f976b8f747137..d6de88a1b70c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -24,7 +24,6 @@ import org.apache.spark.api.python.{PythonEvalType, PythonFunction} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.BooleanType @@ -33,17 +32,14 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { import testImplicits.localSeqToDatasetHolder val pythonUDF = new MyDummyPythonUDF - val pandasUDF = new MyDummyScalarPandasUDF override def beforeAll(): Unit = { super.beforeAll() spark.udf.registerPython("dummyPythonUDF", pythonUDF) - spark.udf.registerPython("dummyScalarPandasUDF", pandasUDF) } override def afterAll(): Unit = { spark.sessionState.functionRegistry.dropFunction(FunctionIdentifier("dummyPythonUDF")) - spark.sessionState.functionRegistry.dropFunction(FunctionIdentifier("dummyScalarPandasUDF")) super.afterAll() } @@ -103,64 +99,6 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { } assert(qualifiedPlanNodes.size == 1) } - - private def collectPythonExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect { - case b: BatchEvalPythonExec => b - } - - private def collectPandasExec(plan: SparkPlan): Seq[ArrowEvalPythonExec] = plan.collect { - case b: ArrowEvalPythonExec => b - } - - test("Chained Python UDFs should be combined to a single physical node") { - val df = Seq(("Hello", 4)).toDF("a", "b") - val df2 = df.withColumn("c", pythonUDF(col("a"))).withColumn("d", pythonUDF(col("c"))) - val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan) - assert(pythonEvalNodes.size == 1) - } - - test("Chained Pandas UDFs should be combined to a single physical node") { - val df = Seq(("Hello", 4)).toDF("a", "b") - val df2 = df.withColumn("c", pandasUDF(col("a"))).withColumn("d", pandasUDF(col("c"))) - val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan) - assert(arrowEvalNodes.size == 1) - } - - test("Mixed Python UDFs and Pandas UDF should be separate physical node") { - val df = Seq(("Hello", 4)).toDF("a", "b") - val df2 = df.withColumn("c", pythonUDF(col("a"))).withColumn("d", pandasUDF(col("b"))) - - val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan) - val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan) - assert(pythonEvalNodes.size == 1) - assert(arrowEvalNodes.size == 1) - } - - test("Independent Python UDFs and Pandas UDFs should be combined separately") { - val df = Seq(("Hello", 4)).toDF("a", "b") - val df2 = df.withColumn("c1", pythonUDF(col("a"))) - .withColumn("c2", pythonUDF(col("c1"))) - .withColumn("d1", pandasUDF(col("a"))) - .withColumn("d2", pandasUDF(col("d1"))) - - val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan) - val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan) - assert(pythonEvalNodes.size == 1) - assert(arrowEvalNodes.size == 1) - } - - test("Dependent Python UDFs and Pandas UDFs should not be combined") { - val df = Seq(("Hello", 4)).toDF("a", "b") - val df2 = df.withColumn("c1", pythonUDF(col("a"))) - .withColumn("d1", pandasUDF(col("c1"))) - .withColumn("c2", pythonUDF(col("d1"))) - .withColumn("d2", pandasUDF(col("c2"))) - - val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan) - val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan) - assert(pythonEvalNodes.size == 2) - assert(arrowEvalNodes.size == 2) - } } // This Python UDF is dummy and just for testing. Unable to execute. @@ -181,7 +119,7 @@ class MyDummyPythonUDF extends UserDefinedPythonFunction( udfDeterministic = true) class MyDummyScalarPandasUDF extends UserDefinedPythonFunction( - name = "dummyPandasUDF", + name = "dummyScalarPandasUDF", func = new DummyUDF, dataType = BooleanType, pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_UDF, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala new file mode 100644 index 0000000000000..2cb2e27c7deb0 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.test.SharedSQLContext + +class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSQLContext { + import testImplicits.newProductEncoder + import testImplicits.localSeqToDatasetHolder + + val batchedPythonUDF = new MyDummyPythonUDF + val scalarPandasUDF = new MyDummyScalarPandasUDF + + private def collectBatchExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect { + case b: BatchEvalPythonExec => b + } + + private def collectArrowExec(plan: SparkPlan): Seq[ArrowEvalPythonExec] = plan.collect { + case b: ArrowEvalPythonExec => b + } + + test("Chained Batched Python UDFs should be combined to a single physical node") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = df.withColumn("c", batchedPythonUDF(col("a"))) + .withColumn("d", batchedPythonUDF(col("c"))) + val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan) + assert(pythonEvalNodes.size == 1) + } + + test("Chained Scalar Pandas UDFs should be combined to a single physical node") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = df.withColumn("c", scalarPandasUDF(col("a"))) + .withColumn("d", scalarPandasUDF(col("c"))) + val arrowEvalNodes = collectArrowExec(df2.queryExecution.executedPlan) + assert(arrowEvalNodes.size == 1) + } + + test("Mixed Batched Python UDFs and Pandas UDF should be separate physical node") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = df.withColumn("c", batchedPythonUDF(col("a"))) + .withColumn("d", scalarPandasUDF(col("b"))) + + val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan) + val arrowEvalNodes = collectArrowExec(df2.queryExecution.executedPlan) + assert(pythonEvalNodes.size == 1) + assert(arrowEvalNodes.size == 1) + } + + test("Independent Batched Python UDFs and Scalar Pandas UDFs should be combined separately") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = df.withColumn("c1", batchedPythonUDF(col("a"))) + .withColumn("c2", batchedPythonUDF(col("c1"))) + .withColumn("d1", scalarPandasUDF(col("a"))) + .withColumn("d2", scalarPandasUDF(col("d1"))) + + val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan) + val arrowEvalNodes = collectArrowExec(df2.queryExecution.executedPlan) + assert(pythonEvalNodes.size == 1) + assert(arrowEvalNodes.size == 1) + } + + test("Dependent Batched Python UDFs and Scalar Pandas UDFs should not be combined") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = df.withColumn("c1", batchedPythonUDF(col("a"))) + .withColumn("d1", scalarPandasUDF(col("c1"))) + .withColumn("c2", batchedPythonUDF(col("d1"))) + .withColumn("d2", scalarPandasUDF(col("c2"))) + + val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan) + val arrowEvalNodes = collectArrowExec(df2.queryExecution.executedPlan) + assert(pythonEvalNodes.size == 2) + assert(arrowEvalNodes.size == 2) + } +} + + From 4c9c007858aef65c2c190b35673404dd61279369 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Mon, 23 Jul 2018 13:02:13 -0400 Subject: [PATCH 07/13] Revert changes to BatchEvalPythonExecSuite --- .../sql/execution/python/BatchEvalPythonExecSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index d6de88a1b70c1..2cc55ff88b983 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.api.python.{PythonEvalType, PythonFunction} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In} -import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.BooleanType @@ -31,11 +31,9 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { import testImplicits.newProductEncoder import testImplicits.localSeqToDatasetHolder - val pythonUDF = new MyDummyPythonUDF - override def beforeAll(): Unit = { super.beforeAll() - spark.udf.registerPython("dummyPythonUDF", pythonUDF) + spark.udf.registerPython("dummyPythonUDF", new MyDummyPythonUDF) } override def afterAll(): Unit = { From 83635da0e0ff033e6c1d9aa750fba596c348c262 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Wed, 25 Jul 2018 14:27:24 +0000 Subject: [PATCH 08/13] Address comments; Use mutable state in collectEvaluableUDFs --- python/pyspark/sql/tests.py | 43 ++++++--- .../execution/python/ExtractPythonUDFs.scala | 93 +++++++++++++------ .../python/ExtractPythonUDFsSuite.scala | 1 - 3 files changed, 94 insertions(+), 43 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b994c06f668c9..9da3b4cfa0c5b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5055,7 +5055,7 @@ def test_mixed_udf(self): df = self.spark.range(0, 1).toDF('v') - # Test mixture of multiple UDFs and Pandas UDFs + # Test mixture of multiple UDFs and Pandas UDFs. @udf('int') def f1(x): @@ -5077,8 +5077,27 @@ def f4(x): assert type(x) == pd.Series return x + 1000 - # Test mixed udfs in a single projection - df1 = df \ + # Test single expression with chained UDFs + df_chained_1 = df.withColumn('f2_f1', f2(f1(df['v']))) + df_chained_2 = df.withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) + df_chained_3 = df.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v']))))) + df_chained_4 = df.withColumn('f4_f2_f1', f4(f2(f1(df['v'])))) + df_chained_5 = df.withColumn('f4_f3_f1', f4(f3(f1(df['v'])))) + + expected_chained_1 = df.withColumn('f2_f1', df['v'] + 11) + expected_chained_2 = df.withColumn('f3_f2_f1', df['v'] + 111) + expected_chained_3 = df.withColumn('f4_f3_f2_f1', df['v'] + 1111) + expected_chained_4 = df.withColumn('f4_f2_f1', df['v'] + 1011) + expected_chained_5 = df.withColumn('f4_f3_f1', df['v'] + 1101) + + self.assertEquals(expected_chained_1.collect(), df_chained_1.collect()) + self.assertEquals(expected_chained_2.collect(), df_chained_2.collect()) + self.assertEquals(expected_chained_3.collect(), df_chained_3.collect()) + self.assertEquals(expected_chained_4.collect(), df_chained_4.collect()) + self.assertEquals(expected_chained_5.collect(), df_chained_5.collect()) + + # Test multiple mixed UDF expressions in a single projection + df_multi_1 = df \ .withColumn('f1', f1(col('v'))) \ .withColumn('f2', f2(col('v'))) \ .withColumn('f3', f3(col('v'))) \ @@ -5096,7 +5115,7 @@ def f4(x): .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) # Test mixed udfs in a single expression - df2 = df \ + df_multi_2 = df \ .withColumn('f1', f1(col('v'))) \ .withColumn('f2', f2(col('v'))) \ .withColumn('f3', f3(col('v'))) \ @@ -5113,8 +5132,7 @@ def f4(x): .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \ .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')))))) - # expected result - df3 = df \ + expected = df \ .withColumn('f1', df['v'] + 1) \ .withColumn('f2', df['v'] + 10) \ .withColumn('f3', df['v'] + 100) \ @@ -5131,8 +5149,8 @@ def f4(x): .withColumn('f4_f3_f2', df['v'] + 1110) \ .withColumn('f4_f3_f2_f1', df['v'] + 1111) - self.assertEquals(df3.collect(), df1.collect()) - self.assertEquals(df3.collect(), df2.collect()) + self.assertEquals(expected.collect(), df_multi_1.collect()) + self.assertEquals(expected.collect(), df_multi_2.collect()) def test_mixed_udf_and_sql(self): import pandas as pd @@ -5148,6 +5166,7 @@ def f1(x): return x + 1 def f2(x): + assert type(x) == pyspark.sql.Column return x + 10 @pandas_udf('int') @@ -5171,8 +5190,7 @@ def f3(x): .withColumn('f3_f1_f2', f3(f1(f2(df['v'])))) \ .withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) - # expected result - df2 = df.withColumn('f1', df['v'] + 1) \ + expected = df.withColumn('f1', df['v'] + 1) \ .withColumn('f2', df['v'] + 10) \ .withColumn('f3', df['v'] + 100) \ .withColumn('f1_f2', df['v'] + 11) \ @@ -5188,7 +5206,7 @@ def f3(x): .withColumn('f3_f1_f2', df['v'] + 111) \ .withColumn('f3_f2_f1', df['v'] + 111) - self.assertEquals(df2.collect(), df1.collect()) + self.assertEquals(expected.collect(), df1.collect()) @unittest.skipIf( @@ -5618,9 +5636,8 @@ def dummy_pandas_udf(df): self.assertEquals(res.count(), 5) def test_mixed_scalar_udfs_followed_by_grouby_apply(self): - # Test Pandas UDF and scalar Python UDF followed by groupby apply - from pyspark.sql.functions import udf, pandas_udf, PandasUDFType import pandas as pd + from pyspark.sql.functions import udf, pandas_udf, PandasUDFType df = self.spark.range(0, 10).toDF('v1') df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) \ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 90818a95ac766..b4bb89578b8b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -95,47 +95,81 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private def hasScalarPythonUDF(e: Expression): Boolean = { - e.find(PythonUDF.isScalarPythonUDF).isDefined - } + private case class LazyEvalType(var evalType: Int = -1) { - private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = { - if (e.evalType != evalType) { - false - } else { - e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType) - // Python UDF can't be evaluated directly in JVM - case children => !children.exists(hasScalarPythonUDF) + def isSet: Boolean = evalType >= 0 + + def set(evalType: Int): Unit = { + if (isSet) { + throw new IllegalStateException("Eval type has already been set") + } else { + this.evalType = evalType + } + } + + def get(): Int = { + if (!isSet) { + throw new IllegalStateException("Eval type is not set") + } else { + evalType } } } - private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match { - case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) => - Seq(udf) - case e => e.children.flatMap(collectEvaluableUDF(_, evalType)) + private def hasScalarPythonUDF(e: Expression): Boolean = { + e.find(PythonUDF.isScalarPythonUDF).isDefined } /** - * Collect evaluable UDFs from the current node. + * Check whether a PythonUDF expression can be evaluated in Python. * - * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node, - * and returns a list of UDFs of the same eval type. + * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar + * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the + * specified eval type. * - * If expressions contain both UDFs eval types, this function will only return Python UDFs. + * This method will also set the lazy eval type to be the type of the first evaluable expression, + * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval + * type will be set to the eval type of the expression. * - * The caller should call this function multiple times until all evaluable UDFs are collected. */ - private def collectEvaluableUDFs(plan: SparkPlan): Seq[PythonUDF] = { - val pythonUDFs = - plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_BATCHED_UDF)) - - if (pythonUDFs.isEmpty) { - plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_SCALAR_PANDAS_UDF)) + private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { + if (!lazyEvalType.isSet) { + e.children match { + // single PythonUDF child could be chained and evaluated in Python if eval type is the same + case Seq(u: PythonUDF) => + // Need to recheck the eval type because lazy eval type will be set if child Python UDF is + // evaluable + canEvaluateInPython(u, lazyEvalType) && lazyEvalType.get == e.evalType + // Python UDF can't be evaluated directly in JVM + case children => if (!children.exists(hasScalarPythonUDF)) { + // We found the first evaluable expression, set lazy eval type to its eval type. + lazyEvalType.set(e.evalType) + true + } else { + false + } + } } else { - pythonUDFs + if (e.evalType != lazyEvalType.get) { + false + } else { + e.children match { + case Seq(u: PythonUDF) => canEvaluateInPython(u, lazyEvalType) + case children => !children.exists(hasScalarPythonUDF) + } + } + } + } + + private def collectEvaluableUDFs( + expr: Expression, + evalType: LazyEvalType + ): Seq[PythonUDF] = { + expr match { + case udf: PythonUDF if + PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) => + Seq(udf) + case e => e.children.flatMap(collectEvaluableUDFs(_, evalType)) } } @@ -147,7 +181,8 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { * Extract all the PythonUDFs from the current operator and evaluate them before the operator. */ private def extract(plan: SparkPlan): SparkPlan = { - val udfs = collectEvaluableUDFs(plan) + val lazyEvalType = new LazyEvalType + val udfs = plan.expressions.flatMap(collectEvaluableUDFs(_, lazyEvalType)) // ignore the PythonUDF that come from second/third aggregate, which is not used .filter(udf => udf.references.subsetOf(plan.inputSet)) if (udfs.isEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala index 2cb2e27c7deb0..76b609d111acd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala @@ -90,4 +90,3 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSQLContext { } } - From 2bc906de5a12dcc452e6855aa30d27021c446e17 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Wed, 25 Jul 2018 14:30:40 +0000 Subject: [PATCH 09/13] Fix import --- python/pyspark/sql/tests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 9da3b4cfa0c5b..a294d70119d0b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5154,6 +5154,7 @@ def f4(x): def test_mixed_udf_and_sql(self): import pandas as pd + from pyspark.sql import Column from pyspark.sql.functions import udf, pandas_udf df = self.spark.range(0, 1).toDF('v') @@ -5166,7 +5167,7 @@ def f1(x): return x + 1 def f2(x): - assert type(x) == pyspark.sql.Column + assert type(x) == Column return x + 10 @pandas_udf('int') From 6b22fea5b42b40d2eb92d931e76d183518533717 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 27 Jul 2018 13:49:47 +0000 Subject: [PATCH 10/13] Address comments --- .../execution/python/ExtractPythonUDFs.scala | 65 +++++-------------- 1 file changed, 16 insertions(+), 49 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index b4bb89578b8b7..38f2b31196e70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -95,7 +95,7 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private case class LazyEvalType(var evalType: Int = -1) { + private case class EvalTypeHolder(var evalType: Int = -1) { def isSet: Boolean = evalType >= 0 @@ -120,57 +120,24 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { e.find(PythonUDF.isScalarPythonUDF).isDefined } - /** - * Check whether a PythonUDF expression can be evaluated in Python. - * - * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar - * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the - * specified eval type. - * - * This method will also set the lazy eval type to be the type of the first evaluable expression, - * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval - * type will be set to the eval type of the expression. - * - */ - private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = { - if (!lazyEvalType.isSet) { - e.children match { - // single PythonUDF child could be chained and evaluated in Python if eval type is the same - case Seq(u: PythonUDF) => - // Need to recheck the eval type because lazy eval type will be set if child Python UDF is - // evaluable - canEvaluateInPython(u, lazyEvalType) && lazyEvalType.get == e.evalType - // Python UDF can't be evaluated directly in JVM - case children => if (!children.exists(hasScalarPythonUDF)) { - // We found the first evaluable expression, set lazy eval type to its eval type. - lazyEvalType.set(e.evalType) - true - } else { - false - } - } - } else { - if (e.evalType != lazyEvalType.get) { - false - } else { - e.children match { - case Seq(u: PythonUDF) => canEvaluateInPython(u, lazyEvalType) - case children => !children.exists(hasScalarPythonUDF) - } - } + private def canEvaluateInPython(e: PythonUDF): Boolean = { + e.children match { + // single PythonUDF child could be chained and evaluated in Python + case Seq(u: PythonUDF) => e.evalType == u.evalType && canEvaluateInPython(u) + // Python UDF can't be evaluated directly in JVM + case children => !children.exists(hasScalarPythonUDF) } } private def collectEvaluableUDFs( expr: Expression, - evalType: LazyEvalType - ): Seq[PythonUDF] = { - expr match { - case udf: PythonUDF if - PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) => - Seq(udf) - case e => e.children.flatMap(collectEvaluableUDFs(_, evalType)) - } + firstEvalType: EvalTypeHolder): Seq[PythonUDF] = expr match { + case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) + && (!firstEvalType.isSet || firstEvalType.get == udf.evalType) + && canEvaluateInPython(udf) => + firstEvalType.evalType = udf.evalType + Seq(udf) + case e => e.children.flatMap(collectEvaluableUDFs(_, firstEvalType)) } def apply(plan: SparkPlan): SparkPlan = plan transformUp { @@ -181,8 +148,8 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { * Extract all the PythonUDFs from the current operator and evaluate them before the operator. */ private def extract(plan: SparkPlan): SparkPlan = { - val lazyEvalType = new LazyEvalType - val udfs = plan.expressions.flatMap(collectEvaluableUDFs(_, lazyEvalType)) + val firstEvalType = new EvalTypeHolder + val udfs = plan.expressions.flatMap(collectEvaluableUDFs(_, firstEvalType)) // ignore the PythonUDF that come from second/third aggregate, which is not used .filter(udf => udf.references.subsetOf(plan.inputSet)) if (udfs.isEmpty) { From b25936d4c5216904f0ca3cf33df4b5c7130aa8f8 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 27 Jul 2018 13:58:06 +0000 Subject: [PATCH 11/13] minor fix --- .../apache/spark/sql/execution/python/ExtractPythonUDFs.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 38f2b31196e70..30fe71825db76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -95,7 +95,7 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private case class EvalTypeHolder(var evalType: Int = -1) { + private case class EvalTypeHolder(private var evalType: Int = -1) { def isSet: Boolean = evalType >= 0 @@ -135,7 +135,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && (!firstEvalType.isSet || firstEvalType.get == udf.evalType) && canEvaluateInPython(udf) => - firstEvalType.evalType = udf.evalType + firstEvalType.set(udf.evalType) Seq(udf) case e => e.children.flatMap(collectEvaluableUDFs(_, firstEvalType)) } From 8e995e81542852ff4af43883db79cdfbe9aca1ad Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 27 Jul 2018 18:01:25 +0000 Subject: [PATCH 12/13] Fix bug --- .../spark/sql/execution/python/ExtractPythonUDFs.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 30fe71825db76..632fef3917335 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -96,12 +96,11 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { private case class EvalTypeHolder(private var evalType: Int = -1) { - def isSet: Boolean = evalType >= 0 def set(evalType: Int): Unit = { - if (isSet) { - throw new IllegalStateException("Eval type has already been set") + if (isSet && evalType != this.evalType) { + throw new IllegalStateException("Cannot reset eval type to a different value") } else { this.evalType = evalType } From f3a45a576b6a186f3694e6bd0f22a8198a9d19a2 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 27 Jul 2018 19:05:46 +0000 Subject: [PATCH 13/13] Use nested function implementation --- .../execution/python/ExtractPythonUDFs.scala | 52 ++++++++----------- 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 632fef3917335..cb75874be32ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -95,25 +95,8 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { */ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { - private case class EvalTypeHolder(private var evalType: Int = -1) { - def isSet: Boolean = evalType >= 0 - - def set(evalType: Int): Unit = { - if (isSet && evalType != this.evalType) { - throw new IllegalStateException("Cannot reset eval type to a different value") - } else { - this.evalType = evalType - } - } - - def get(): Int = { - if (!isSet) { - throw new IllegalStateException("Eval type is not set") - } else { - evalType - } - } - } + private type EvalType = Int + private type EvalTypeChecker = EvalType => Boolean private def hasScalarPythonUDF(e: Expression): Boolean = { e.find(PythonUDF.isScalarPythonUDF).isDefined @@ -128,15 +111,25 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { } } - private def collectEvaluableUDFs( - expr: Expression, - firstEvalType: EvalTypeHolder): Seq[PythonUDF] = expr match { - case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) - && (!firstEvalType.isSet || firstEvalType.get == udf.evalType) - && canEvaluateInPython(udf) => - firstEvalType.set(udf.evalType) - Seq(udf) - case e => e.children.flatMap(collectEvaluableUDFs(_, firstEvalType)) + private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = { + // Eval type checker is set once when we find the first evaluable UDF and its value + // shouldn't change later. + // Used to check if subsequent UDFs are of the same type as the first UDF. (since we can only + // extract UDFs of the same eval type) + var evalTypeChecker: Option[EvalTypeChecker] = None + + def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match { + case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) + && evalTypeChecker.isEmpty => + evalTypeChecker = Some((otherEvalType: EvalType) => otherEvalType == udf.evalType) + Seq(udf) + case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) + && evalTypeChecker.get(udf.evalType) => + Seq(udf) + case e => e.children.flatMap(collectEvaluableUDFs) + } + + expressions.flatMap(collectEvaluableUDFs) } def apply(plan: SparkPlan): SparkPlan = plan transformUp { @@ -147,8 +140,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { * Extract all the PythonUDFs from the current operator and evaluate them before the operator. */ private def extract(plan: SparkPlan): SparkPlan = { - val firstEvalType = new EvalTypeHolder - val udfs = plan.expressions.flatMap(collectEvaluableUDFs(_, firstEvalType)) + val udfs = collectEvaluableUDFsFromExpressions(plan.expressions) // ignore the PythonUDF that come from second/third aggregate, which is not used .filter(udf => udf.references.subsetOf(plan.inputSet)) if (udfs.isEmpty) {