diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala index f4c2d02ee9420..41521bfae1add 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala @@ -32,7 +32,7 @@ case class UserDefinedPythonFunction( pythonEvalType: Int, udfDeterministic: Boolean) { - def builder(e: Seq[Expression]): PythonUDF = { + def builder(e: Seq[Expression]): Expression = { PythonUDF(name, func, dataType, e, pythonEvalType, udfDeterministic) } diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part1.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part1.sql index 3e877333c07f8..d829a5c1159fd 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part1.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part1.sql @@ -9,12 +9,10 @@ -- SET extra_float_digits = 0; -- This test file was converted from pgSQL/aggregates_part1.sql. --- Note that currently registered UDF returns a string. So there are some differences, for instance --- in string cast within UDF in Scala and Python. -SELECT CAST(avg(udf(four)) AS decimal(10,3)) AS avg_1 FROM onek; +SELECT avg(udf(four)) AS avg_1 FROM onek; -SELECT CAST(udf(avg(a)) AS decimal(10,3)) AS avg_32 FROM aggtest WHERE a < 100; +SELECT udf(avg(a)) AS avg_32 FROM aggtest WHERE a < 100; -- In 7.1, avg(float4) is computed using float8 arithmetic. -- Round the result to 3 digits to avoid platform-specific results. @@ -23,32 +21,32 @@ select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest; -- `student` has a column with data type POINT, which is not supported by Spark [SPARK-27766] -- SELECT avg(gpa) AS avg_3_4 FROM ONLY student; -SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek; +SELECT sum(udf(four)) AS sum_1500 FROM onek; SELECT udf(sum(a)) AS sum_198 FROM aggtest; -SELECT CAST(udf(udf(sum(b))) AS decimal(10,3)) AS avg_431_773 FROM aggtest; +SELECT udf(udf(sum(b))) AS avg_431_773 FROM aggtest; -- `student` has a column with data type POINT, which is not supported by Spark [SPARK-27766] -- SELECT sum(gpa) AS avg_6_8 FROM ONLY student; SELECT udf(max(four)) AS max_3 FROM onek; -SELECT max(CAST(udf(a) AS int)) AS max_100 FROM aggtest; -SELECT CAST(udf(udf(max(aggtest.b))) AS decimal(10,3)) AS max_324_78 FROM aggtest; +SELECT max(udf(a)) AS max_100 FROM aggtest; +SELECT udf(udf(max(aggtest.b))) AS max_324_78 FROM aggtest; -- `student` has a column with data type POINT, which is not supported by Spark [SPARK-27766] -- SELECT max(student.gpa) AS max_3_7 FROM student; -SELECT CAST(stddev_pop(udf(b)) AS decimal(10,3)) FROM aggtest; -SELECT CAST(udf(stddev_samp(b)) AS decimal(10,3)) FROM aggtest; -SELECT CAST(var_pop(udf(b)) AS decimal(10,3)) FROM aggtest; -SELECT CAST(udf(var_samp(b)) AS decimal(10,3)) FROM aggtest; +SELECT stddev_pop(udf(b)) FROM aggtest; +SELECT udf(stddev_samp(b)) FROM aggtest; +SELECT var_pop(udf(b)) FROM aggtest; +SELECT udf(var_samp(b)) FROM aggtest; -SELECT CAST(udf(stddev_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest; -SELECT CAST(stddev_samp(CAST(udf(b) AS Decimal(38,0))) AS decimal(10,3)) FROM aggtest; -SELECT CAST(udf(var_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest; -SELECT CAST(var_samp(udf(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest; +SELECT udf(stddev_pop(CAST(b AS Decimal(38,0)))) FROM aggtest; +SELECT stddev_samp(CAST(udf(b) AS Decimal(38,0))) FROM aggtest; +SELECT udf(var_pop(CAST(b AS Decimal(38,0)))) FROM aggtest; +SELECT var_samp(udf(CAST(b AS Decimal(38,0)))) FROM aggtest; -- population variance is defined for a single tuple, sample variance -- is not -SELECT CAST(udf(var_pop(1.0)) AS int), var_samp(udf(2.0)); -SELECT CAST(stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))) AS int), stddev_samp(CAST(udf(4.0) AS Decimal(38,0))); +SELECT udf(var_pop(1.0)), var_samp(udf(2.0)); +SELECT stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))), stddev_samp(CAST(udf(4.0) AS Decimal(38,0))); -- verify correct results for null and NaN inputs @@ -76,9 +74,9 @@ FROM (VALUES ('-Infinity'), ('Infinity')) v(x); -- test accuracy with a large input offset -SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS int), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3)) +SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE))) FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x); -SELECT CAST(avg(udf(x)) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3)) +SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE))) FROM (VALUES (7000000000005), (7000000000007)) v(x); -- SQL2003 binary aggregates [SPARK-23907] @@ -89,8 +87,8 @@ FROM (VALUES (7000000000005), (7000000000007)) v(x); -- SELECT regr_avgx(b, a), regr_avgy(b, a) FROM aggtest; -- SELECT regr_r2(b, a) FROM aggtest; -- SELECT regr_slope(b, a), regr_intercept(b, a) FROM aggtest; -SELECT CAST(udf(covar_pop(b, udf(a))) AS decimal(10,3)), CAST(covar_samp(udf(b), a) as decimal(10,3)) FROM aggtest; -SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest; +SELECT udf(covar_pop(b, udf(a))), covar_samp(udf(b), a) FROM aggtest; +SELECT corr(b, udf(a)) FROM aggtest; -- test accum and combine functions directly [SPARK-23907] @@ -122,7 +120,7 @@ SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest; SELECT count(udf(four)) AS cnt_1000 FROM onek; SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek; -select ten, udf(count(*)), CAST(sum(udf(four)) AS int) from onek +select ten, udf(count(*)), sum(udf(four)) from onek group by ten order by ten; select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part2.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part2.sql index 57491a32c48fb..5636537398a86 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part2.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part2.sql @@ -6,8 +6,6 @@ -- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/aggregates.sql#L145-L350 -- -- This test file was converted from pgSQL/aggregates_part2.sql. --- Note that currently registered UDF returns a string. So there are some differences, for instance --- in string cast within UDF in Scala and Python. create temporary view int4_tbl as select * from values (0), diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-case.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-case.sql index a2aab79844d40..1865ee94ec1f9 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-case.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-case.sql @@ -7,8 +7,6 @@ -- Test the CASE statement -- -- This test file was converted from pgSQL/case.sql. --- Note that currently registered UDF returns a string. So there are some differences, for instance --- in string cast within UDF in Scala and Python. CREATE TABLE CASE_TBL ( i integer, @@ -38,7 +36,7 @@ INSERT INTO CASE2_TBL VALUES (NULL, -6); SELECT '3' AS `One`, CASE - WHEN CAST(udf(1 < 2) AS boolean) THEN 3 + WHEN udf(1 < 2) THEN 3 END AS `Simple WHEN`; SELECT '' AS `One`, @@ -60,7 +58,7 @@ SELECT udf('4') AS `One`, SELECT udf('6') AS `One`, CASE - WHEN CAST(udf(1 > 2) AS boolean) THEN 3 + WHEN udf(1 > 2) THEN 3 WHEN udf(4) < 5 THEN 6 ELSE 7 END AS `Two WHEN with default`; @@ -70,7 +68,7 @@ SELECT '7' AS `None`, END AS `NULL on no matches`; -- Constant-expression folding shouldn't evaluate unreachable subexpressions -SELECT CASE WHEN CAST(udf(1=0) AS boolean) THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END; +SELECT CASE WHEN udf(1=0) THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END; SELECT CASE 1 WHEN 0 THEN 1/udf(0) WHEN 1 THEN 1 ELSE 2/0 END; -- [SPARK-27923] PostgreSQL throws an exception but Spark SQL is NULL @@ -142,7 +140,7 @@ SELECT udf('') AS Five, NULLIF(a.i,b.i) AS `NULLIF(a.i,b.i)`, SELECT '' AS `Two`, * FROM CASE_TBL a, CASE2_TBL b - WHERE CAST(udf(COALESCE(f,b.i) = 2) AS boolean); + WHERE udf(COALESCE(f,b.i) = 2); -- We don't support update now. -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql index 6ae34ae589fac..ff8573ad7e562 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql @@ -1,6 +1,4 @@ -- This test file was converted from having.sql. --- Note that currently registered UDF returns a string. So there are some differences, for instance --- in string cast within UDF in Scala and Python. create temporary view hav as select * from values ("one", 1), diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql index 6862683178002..e5eb812d69a1c 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql @@ -4,8 +4,6 @@ --SET spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=false -- This test file was converted from natural-join.sql. --- Note that currently registered UDF returns a string. So there are some differences, for instance --- in string cast within UDF in Scala and Python. create temporary view nt1 as select * from values ("one", 1), diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-special-values.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-special-values.sql new file mode 100644 index 0000000000000..9cd15369bb164 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-special-values.sql @@ -0,0 +1,8 @@ +-- This file tests special values such as NaN, Infinity and NULL. + +SELECT udf(x) FROM (VALUES (1), (2), (NULL)) v(x); +SELECT udf(x) FROM (VALUES ('A'), ('B'), (NULL)) v(x); +SELECT udf(x) FROM (VALUES ('NaN'), ('1'), ('2')) v(x); +SELECT udf(x) FROM (VALUES ('Infinity'), ('1'), ('2')) v(x); +SELECT udf(x) FROM (VALUES ('-Infinity'), ('1'), ('2')) v(x); +SELECT udf(x) FROM (VALUES 0.00000001, 0.00000002, 0.00000003) v(x); diff --git a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out index 5c08245fd320d..a2f64717d73a1 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out @@ -3,19 +3,19 @@ -- !query 0 -SELECT CAST(avg(udf(four)) AS decimal(10,3)) AS avg_1 FROM onek +SELECT avg(udf(four)) AS avg_1 FROM onek -- !query 0 schema -struct +struct -- !query 0 output 1.5 -- !query 1 -SELECT CAST(udf(avg(a)) AS decimal(10,3)) AS avg_32 FROM aggtest WHERE a < 100 +SELECT udf(avg(a)) AS avg_32 FROM aggtest WHERE a < 100 -- !query 1 schema -struct +struct -- !query 1 output -32.667 +32.666666666666664 -- !query 2 @@ -27,9 +27,9 @@ struct -- !query 3 -SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek +SELECT sum(udf(four)) AS sum_1500 FROM onek -- !query 3 schema -struct +struct -- !query 3 output 1500 @@ -37,29 +37,29 @@ struct -- !query 4 SELECT udf(sum(a)) AS sum_198 FROM aggtest -- !query 4 schema -struct +struct -- !query 4 output 198 -- !query 5 -SELECT CAST(udf(udf(sum(b))) AS decimal(10,3)) AS avg_431_773 FROM aggtest +SELECT udf(udf(sum(b))) AS avg_431_773 FROM aggtest -- !query 5 schema -struct +struct -- !query 5 output -431.773 +431.77260909229517 -- !query 6 SELECT udf(max(four)) AS max_3 FROM onek -- !query 6 schema -struct +struct -- !query 6 output 3 -- !query 7 -SELECT max(CAST(udf(a) AS int)) AS max_100 FROM aggtest +SELECT max(udf(a)) AS max_100 FROM aggtest -- !query 7 schema struct -- !query 7 output @@ -67,97 +67,97 @@ struct -- !query 8 -SELECT CAST(udf(udf(max(aggtest.b))) AS decimal(10,3)) AS max_324_78 FROM aggtest +SELECT udf(udf(max(aggtest.b))) AS max_324_78 FROM aggtest -- !query 8 schema -struct +struct -- !query 8 output 324.78 -- !query 9 -SELECT CAST(stddev_pop(udf(b)) AS decimal(10,3)) FROM aggtest +SELECT stddev_pop(udf(b)) FROM aggtest -- !query 9 schema -struct +struct -- !query 9 output -131.107 +131.10703231895047 -- !query 10 -SELECT CAST(udf(stddev_samp(b)) AS decimal(10,3)) FROM aggtest +SELECT udf(stddev_samp(b)) FROM aggtest -- !query 10 schema -struct +struct -- !query 10 output -151.389 +151.38936080399804 -- !query 11 -SELECT CAST(var_pop(udf(b)) AS decimal(10,3)) FROM aggtest +SELECT var_pop(udf(b)) FROM aggtest -- !query 11 schema -struct +struct -- !query 11 output -17189.054 +17189.053923482323 -- !query 12 -SELECT CAST(udf(var_samp(b)) AS decimal(10,3)) FROM aggtest +SELECT udf(var_samp(b)) FROM aggtest -- !query 12 schema -struct +struct -- !query 12 output -22918.739 +22918.738564643096 -- !query 13 -SELECT CAST(udf(stddev_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest +SELECT udf(stddev_pop(CAST(b AS Decimal(38,0)))) FROM aggtest -- !query 13 schema -struct +struct -- !query 13 output -131.181 +131.18117242958306 -- !query 14 -SELECT CAST(stddev_samp(CAST(udf(b) AS Decimal(38,0))) AS decimal(10,3)) FROM aggtest +SELECT stddev_samp(CAST(udf(b) AS Decimal(38,0))) FROM aggtest -- !query 14 schema -struct +struct -- !query 14 output -151.475 +151.47497042966097 -- !query 15 -SELECT CAST(udf(var_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest +SELECT udf(var_pop(CAST(b AS Decimal(38,0)))) FROM aggtest -- !query 15 schema -struct +struct -- !query 15 output 17208.5 -- !query 16 -SELECT CAST(var_samp(udf(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest +SELECT var_samp(udf(CAST(b AS Decimal(38,0)))) FROM aggtest -- !query 16 schema -struct +struct -- !query 16 output -22944.667 +22944.666666666668 -- !query 17 -SELECT CAST(udf(var_pop(1.0)) AS int), var_samp(udf(2.0)) +SELECT udf(var_pop(1.0)), var_samp(udf(2.0)) -- !query 17 schema -struct +struct -- !query 17 output -0 NaN +0.0 NaN -- !query 18 -SELECT CAST(stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))) AS int), stddev_samp(CAST(udf(4.0) AS Decimal(38,0))) +SELECT stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))), stddev_samp(CAST(udf(4.0) AS Decimal(38,0))) -- !query 18 schema -struct +struct -- !query 18 output -0 NaN +0.0 NaN -- !query 19 select sum(udf(CAST(null AS int))) from range(1,4) -- !query 19 schema -struct +struct -- !query 19 output NULL @@ -165,7 +165,7 @@ NULL -- !query 20 select sum(udf(CAST(null AS long))) from range(1,4) -- !query 20 schema -struct +struct -- !query 20 output NULL @@ -173,7 +173,7 @@ NULL -- !query 21 select sum(udf(CAST(null AS Decimal(38,0)))) from range(1,4) -- !query 21 schema -struct +struct -- !query 21 output NULL @@ -181,7 +181,7 @@ NULL -- !query 22 select sum(udf(CAST(null AS DOUBLE))) from range(1,4) -- !query 22 schema -struct +struct -- !query 22 output NULL @@ -189,7 +189,7 @@ NULL -- !query 23 select avg(udf(CAST(null AS int))) from range(1,4) -- !query 23 schema -struct +struct -- !query 23 output NULL @@ -197,7 +197,7 @@ NULL -- !query 24 select avg(udf(CAST(null AS long))) from range(1,4) -- !query 24 schema -struct +struct -- !query 24 output NULL @@ -205,7 +205,7 @@ NULL -- !query 25 select avg(udf(CAST(null AS Decimal(38,0)))) from range(1,4) -- !query 25 schema -struct +struct -- !query 25 output NULL @@ -213,7 +213,7 @@ NULL -- !query 26 select avg(udf(CAST(null AS DOUBLE))) from range(1,4) -- !query 26 schema -struct +struct -- !query 26 output NULL @@ -221,7 +221,7 @@ NULL -- !query 27 select sum(CAST(udf('NaN') AS DOUBLE)) from range(1,4) -- !query 27 schema -struct +struct -- !query 27 output NaN @@ -229,7 +229,7 @@ NaN -- !query 28 select avg(CAST(udf('NaN') AS DOUBLE)) from range(1,4) -- !query 28 schema -struct +struct -- !query 28 output NaN @@ -238,7 +238,7 @@ NaN SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE)) FROM (VALUES ('Infinity'), ('1')) v(x) -- !query 29 schema -struct +struct -- !query 29 output Infinity NaN @@ -247,7 +247,7 @@ Infinity NaN SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE)) FROM (VALUES ('Infinity'), ('Infinity')) v(x) -- !query 30 schema -struct +struct -- !query 30 output Infinity NaN @@ -256,43 +256,43 @@ Infinity NaN SELECT avg(CAST(udf(x) AS DOUBLE)), var_pop(CAST(udf(x) AS DOUBLE)) FROM (VALUES ('-Infinity'), ('Infinity')) v(x) -- !query 31 schema -struct +struct -- !query 31 output NaN NaN -- !query 32 -SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS int), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3)) +SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE))) FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x) -- !query 32 schema -struct +struct -- !query 32 output -100000005 2.5 +1.00000005E8 2.5 -- !query 33 -SELECT CAST(avg(udf(x)) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3)) +SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE))) FROM (VALUES (7000000000005), (7000000000007)) v(x) -- !query 33 schema -struct +struct -- !query 33 output -7000000000006 1 +7.000000000006E12 1.0 -- !query 34 -SELECT CAST(udf(covar_pop(b, udf(a))) AS decimal(10,3)), CAST(covar_samp(udf(b), a) as decimal(10,3)) FROM aggtest +SELECT udf(covar_pop(b, udf(a))), covar_samp(udf(b), a) FROM aggtest -- !query 34 schema -struct +struct -- !query 34 output -653.629 871.505 +653.6289553875104 871.5052738500139 -- !query 35 -SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest +SELECT corr(b, udf(a)) FROM aggtest -- !query 35 schema -struct +struct -- !query 35 output -0.14 +0.1396345165178734 -- !query 36 @@ -306,16 +306,16 @@ struct -- !query 37 SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek -- !query 37 schema -struct +struct -- !query 37 output 4 -- !query 38 -select ten, udf(count(*)), CAST(sum(udf(four)) AS int) from onek +select ten, udf(count(*)), sum(udf(four)) from onek group by ten order by ten -- !query 38 schema -struct +struct -- !query 38 output 0 100 100 1 100 200 @@ -333,7 +333,7 @@ struct +struct -- !query 39 output 0 100 2 1 100 4 @@ -352,7 +352,7 @@ select ten, udf(sum(distinct four)) from onek a group by ten having exists (select 1 from onek b where udf(sum(distinct a.four)) = b.four) -- !query 40 schema -struct +struct -- !query 40 output 0 2 2 2 @@ -372,7 +372,7 @@ struct<> org.apache.spark.sql.AnalysisException Aggregate/Window/Generate expressions are not valid in where clause of the query. -Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(udf(four) AS BIGINT))] +Expression in where clause: [(sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT)) = CAST(CAST(udf(cast(four as string)) AS INT) AS BIGINT))] Invalid expressions: [sum(DISTINCT CAST((outer() + b.`four`) AS BIGINT))]; diff --git a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part2.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part2.sql.out index d90aa11fc6eff..9fe943874c3e5 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part2.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part2.sql.out @@ -59,7 +59,7 @@ true false true false true true true true true -- !query 3 select min(udf(unique1)) from tenk1 -- !query 3 schema -struct +struct -- !query 3 output 0 @@ -67,7 +67,7 @@ struct -- !query 4 select udf(max(unique1)) from tenk1 -- !query 4 schema -struct +struct -- !query 4 output 9999 @@ -115,7 +115,7 @@ struct -- !query 10 select distinct max(udf(unique2)) from tenk1 -- !query 10 schema -struct +struct -- !query 10 output 9999 @@ -139,7 +139,7 @@ struct -- !query 13 select udf(max(udf(unique2))) from tenk1 order by udf(max(unique2))+1 -- !query 13 schema -struct +struct -- !query 13 output 9999 @@ -147,7 +147,7 @@ struct -- !query 14 select t1.max_unique2, udf(g) from (select max(udf(unique2)) as max_unique2 FROM tenk1) t1 LATERAL VIEW explode(array(1,2,3)) t2 AS g order by g desc -- !query 14 schema -struct +struct -- !query 14 output 9999 3 9999 2 @@ -157,6 +157,6 @@ struct -- !query 15 select udf(max(100)) from tenk1 -- !query 15 schema -struct +struct -- !query 15 output 100 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out index 91650e115393f..d9a8ca86361fc 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out @@ -107,7 +107,7 @@ struct<> -- !query 12 SELECT '3' AS `One`, CASE - WHEN CAST(udf(1 < 2) AS boolean) THEN 3 + WHEN udf(1 < 2) THEN 3 END AS `Simple WHEN` -- !query 12 schema struct @@ -121,7 +121,7 @@ SELECT '' AS `One`, WHEN 1 > 2 THEN udf(3) END AS `Simple default` -- !query 13 schema -struct +struct -- !query 13 output NULL @@ -133,7 +133,7 @@ SELECT '3' AS `One`, ELSE udf(4) END AS `Simple ELSE` -- !query 14 schema -struct +struct -- !query 14 output 3 3 @@ -153,7 +153,7 @@ struct -- !query 16 SELECT udf('6') AS `One`, CASE - WHEN CAST(udf(1 > 2) AS boolean) THEN 3 + WHEN udf(1 > 2) THEN 3 WHEN udf(4) < 5 THEN 6 ELSE 7 END AS `Two WHEN with default` @@ -174,9 +174,9 @@ struct -- !query 18 -SELECT CASE WHEN CAST(udf(1=0) AS boolean) THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END +SELECT CASE WHEN udf(1=0) THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END -- !query 18 schema -struct +struct -- !query 18 output 1 @@ -184,15 +184,15 @@ struct +struct -- !query 19 output -1.0 +1 -- !query 20 SELECT CASE WHEN i > 100 THEN udf(1/0) ELSE udf(0) END FROM case_tbl -- !query 20 schema -struct 100) THEN udf((1 div 0)) ELSE udf(0) END:string> +struct 100) THEN CAST(udf(cast((1 div 0) as string)) AS INT) ELSE CAST(udf(cast(0 as string)) AS INT) END:int> -- !query 20 output 0 0 @@ -203,7 +203,7 @@ struct 100) THEN udf((1 div 0)) ELSE udf(0) END:string> -- !query 21 SELECT CASE 'a' WHEN 'a' THEN udf(1) ELSE udf(2) END -- !query 21 schema -struct +struct -- !query 21 output 1 @@ -294,7 +294,7 @@ struct SELECT udf(COALESCE(a.f, b.i, b.j)) FROM CASE_TBL a, CASE2_TBL b -- !query 28 schema -struct +struct -- !query 28 output -30.3 -30.3 @@ -369,7 +369,7 @@ struct -- !query 31 SELECT '' AS `Two`, * FROM CASE_TBL a, CASE2_TBL b - WHERE CAST(udf(COALESCE(f,b.i) = 2) AS boolean) + WHERE udf(COALESCE(f,b.i) = 2) -- !query 31 schema struct -- !query 31 output diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-count.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-count.sql.out index 9476937abd9e1..3d7c64054a6ac 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-count.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-count.sql.out @@ -17,7 +17,7 @@ SELECT udf(count(*)), udf(count(1)), udf(count(null)), udf(count(a)), udf(count(b)), udf(count(a + b)), udf(count((a, b))) FROM testData -- !query 1 schema -struct +struct -- !query 1 output 7 7 0 5 5 4 7 @@ -32,7 +32,7 @@ SELECT udf(count(DISTINCT (a, b))) FROM testData -- !query 2 schema -struct +struct -- !query 2 output 1 0 2 2 2 6 @@ -40,7 +40,7 @@ struct +struct -- !query 3 output 4 4 4 @@ -50,6 +50,6 @@ SELECT udf(count(DISTINCT a, b)), udf(count(DISTINCT b, a)), udf(count(DISTINCT *)), udf(count(DISTINCT testData.*)) FROM testData -- !query 4 schema -struct +struct -- !query 4 output 3 3 3 3 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out index 7cea2e5128f8b..1effcc8470e19 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out @@ -18,7 +18,7 @@ struct<> -- !query 1 SELECT udf(k) AS k, udf(sum(v)) FROM hav GROUP BY k HAVING udf(sum(v)) > 2 -- !query 1 schema -struct +struct -- !query 1 output one 6 three 3 @@ -27,7 +27,7 @@ three 3 -- !query 2 SELECT udf(count(udf(k))) FROM hav GROUP BY v + 1 HAVING v + 1 = udf(2) -- !query 2 schema -struct +struct -- !query 2 output 1 @@ -35,7 +35,7 @@ struct -- !query 3 SELECT udf(MIN(t.v)) FROM (SELECT * FROM hav WHERE v > 0) t HAVING(udf(COUNT(udf(1))) > 0) -- !query 3 schema -struct +struct -- !query 3 output 1 @@ -43,7 +43,7 @@ struct -- !query 4 SELECT udf(a + b) FROM VALUES (1L, 2), (3L, 4) AS T(a, b) GROUP BY a + b HAVING a + b > udf(1) -- !query 4 schema -struct +struct -- !query 4 output 3 7 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out index 10952cb21e4f3..120f2d39f73dc 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out @@ -59,7 +59,7 @@ struct<> -- !query 6 SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag -- !query 6 schema -struct +struct -- !query 6 output 1 a 1 a diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out index 53ef177db0bb7..950809ddcaf25 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out @@ -59,6 +59,6 @@ two 2 22 -- !query 5 SELECT udf(count(*)) FROM nt1 natural full outer join nt2 -- !query 5 schema -struct +struct -- !query 5 output 4 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-special-values.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-special-values.sql.out new file mode 100644 index 0000000000000..7b2b5dbe578cc --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-special-values.sql.out @@ -0,0 +1,62 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 6 + + +-- !query 0 +SELECT udf(x) FROM (VALUES (1), (2), (NULL)) v(x) +-- !query 0 schema +struct +-- !query 0 output +1 +2 +NULL + + +-- !query 1 +SELECT udf(x) FROM (VALUES ('A'), ('B'), (NULL)) v(x) +-- !query 1 schema +struct +-- !query 1 output +A +B +NULL + + +-- !query 2 +SELECT udf(x) FROM (VALUES ('NaN'), ('1'), ('2')) v(x) +-- !query 2 schema +struct +-- !query 2 output +1 +2 +NaN + + +-- !query 3 +SELECT udf(x) FROM (VALUES ('Infinity'), ('1'), ('2')) v(x) +-- !query 3 schema +struct +-- !query 3 output +1 +2 +Infinity + + +-- !query 4 +SELECT udf(x) FROM (VALUES ('-Infinity'), ('1'), ('2')) v(x) +-- !query 4 schema +struct +-- !query 4 output +-Infinity +1 +2 + + +-- !query 5 +SELECT udf(x) FROM (VALUES 0.00000001, 0.00000002, 0.00000003) v(x) +-- !query 5 schema +struct +-- !query 5 output +0.00000001 +0.00000002 +0.00000003 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala index e379d6df867c1..d62fe961117a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala @@ -26,6 +26,7 @@ import org.apache.spark.TestUtils import org.apache.spark.api.python.{PythonBroadcast, PythonEvalType, PythonFunction, PythonUtils} import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.config.Tests +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression} import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.execution.python.UserDefinedPythonFunction import org.apache.spark.sql.expressions.SparkUserDefinedFunction @@ -35,8 +36,12 @@ import org.apache.spark.sql.types.StringType * This object targets to integrate various UDF test cases so that Scalar UDF, Python UDF and * Scalar Pandas UDFs can be tested in SBT & Maven tests. * - * The available UDFs cast input to strings, which take one column as input and return a string - * type column as output. + * The available UDFs are special. It defines an UDF wrapped by cast. So, the input column is + * casted into string, UDF returns strings as are, and then output column is casted back to + * the input column. In this way, UDF is virtually no-op. + * + * Note that, due to this implementation limitation, complex types such as map, array and struct + * types do not work with this UDFs because they cannot be same after the cast roundtrip. * * To register Scala UDF in SQL: * {{{ @@ -59,8 +64,9 @@ import org.apache.spark.sql.types.StringType * To use it in Scala API and SQL: * {{{ * sql("SELECT udf_name(1)") - * spark.range(10).select(expr("udf_name(id)") - * spark.range(10).select(pandasTestUDF($"id")) + * val df = spark.range(10) + * df.select(expr("udf_name(id)") + * df.select(pandasTestUDF(df("id"))) * }}} */ object IntegratedUDFTestUtils extends SQLHelper { @@ -137,7 +143,8 @@ object IntegratedUDFTestUtils extends SQLHelper { "from pyspark.sql.types import StringType; " + "from pyspark.serializers import CloudPickleSerializer; " + s"f = open('$path', 'wb');" + - s"f.write(CloudPickleSerializer().dumps((lambda x: str(x), StringType())))"), + "f.write(CloudPickleSerializer().dumps((" + + "lambda x: None if x is None else str(x), StringType())))"), None, "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! binaryPythonFunc = Files.readAllBytes(path.toPath) @@ -158,7 +165,9 @@ object IntegratedUDFTestUtils extends SQLHelper { "from pyspark.sql.types import StringType; " + "from pyspark.serializers import CloudPickleSerializer; " + s"f = open('$path', 'wb');" + - s"f.write(CloudPickleSerializer().dumps((lambda x: x.apply(str), StringType())))"), + "f.write(CloudPickleSerializer().dumps((" + + "lambda x: x.apply(" + + "lambda v: None if v is None else str(v)), StringType())))"), None, "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! binaryPandasFunc = Files.readAllBytes(path.toPath) @@ -198,11 +207,22 @@ object IntegratedUDFTestUtils extends SQLHelper { } /** - * A Python UDF that takes one column and returns a string column. - * Equivalent to `udf(lambda x: str(x), "string")` + * A Python UDF that takes one column, casts into string, executes the Python native function, + * and casts back to the type of input column. + * + * Virtually equivalent to: + * + * {{{ + * from pyspark.sql.functions import udf + * + * df = spark.range(3).toDF("col") + * python_udf = udf(lambda x: str(x), "string") + * casted_col = python_udf(df.col.cast("string")) + * casted_col.cast(df.schema["col"].dataType) + * }}} */ case class TestPythonUDF(name: String) extends TestUDF { - private[IntegratedUDFTestUtils] lazy val udf = UserDefinedPythonFunction( + private[IntegratedUDFTestUtils] lazy val udf = new UserDefinedPythonFunction( name = name, func = PythonFunction( command = pythonFunc, @@ -214,7 +234,16 @@ object IntegratedUDFTestUtils extends SQLHelper { accumulator = null), dataType = StringType, pythonEvalType = PythonEvalType.SQL_BATCHED_UDF, - udfDeterministic = true) + udfDeterministic = true) { + + override def builder(e: Seq[Expression]): Expression = { + assert(e.length == 1, "Defined UDF only has one column") + val expr = e.head + assert(expr.resolved, "column should be resolved to use the same type " + + "as input. Try df(name) or df.col(name)") + Cast(super.builder(Cast(expr, StringType) :: Nil), expr.dataType) + } + } def apply(exprs: Column*): Column = udf(exprs: _*) @@ -222,11 +251,22 @@ object IntegratedUDFTestUtils extends SQLHelper { } /** - * A Scalar Pandas UDF that takes one column and returns a string column. - * Equivalent to `pandas_udf(lambda x: x.apply(str), "string", PandasUDFType.SCALAR)`. + * A Scalar Pandas UDF that takes one column, casts into string, executes the + * Python native function, and casts back to the type of input column. + * + * Virtually equivalent to: + * + * {{{ + * from pyspark.sql.functions import pandas_udf + * + * df = spark.range(3).toDF("col") + * scalar_udf = pandas_udf(lambda x: x.apply(lambda v: str(v)), "string") + * casted_col = scalar_udf(df.col.cast("string")) + * casted_col.cast(df.schema["col"].dataType) + * }}} */ case class TestScalarPandasUDF(name: String) extends TestUDF { - private[IntegratedUDFTestUtils] lazy val udf = UserDefinedPythonFunction( + private[IntegratedUDFTestUtils] lazy val udf = new UserDefinedPythonFunction( name = name, func = PythonFunction( command = pandasFunc, @@ -238,7 +278,16 @@ object IntegratedUDFTestUtils extends SQLHelper { accumulator = null), dataType = StringType, pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_UDF, - udfDeterministic = true) + udfDeterministic = true) { + + override def builder(e: Seq[Expression]): Expression = { + assert(e.length == 1, "Defined UDF only has one column") + val expr = e.head + assert(expr.resolved, "column should be resolved to use the same type " + + "as input. Try df(name) or df.col(name)") + Cast(super.builder(Cast(expr, StringType) :: Nil), expr.dataType) + } + } def apply(exprs: Column*): Column = udf(exprs: _*) @@ -246,15 +295,39 @@ object IntegratedUDFTestUtils extends SQLHelper { } /** - * A Scala UDF that takes one column and returns a string column. - * Equivalent to `udf((input: Any) => String.valueOf(input)`. + * A Scala UDF that takes one column, casts into string, executes the + * Scala native function, and casts back to the type of input column. + * + * Virtually equivalent to: + * + * {{{ + * import org.apache.spark.sql.functions.udf + * + * val df = spark.range(3).toDF("col") + * val scala_udf = udf((input: Any) => input.toString) + * val casted_col = scala_udf(df.col("col").cast("string")) + * casted_col.cast(df.schema("col").dataType) + * }}} */ case class TestScalaUDF(name: String) extends TestUDF { - private[IntegratedUDFTestUtils] lazy val udf = SparkUserDefinedFunction( - (input: Any) => String.valueOf(input), + private[IntegratedUDFTestUtils] lazy val udf = new SparkUserDefinedFunction( + (input: Any) => if (input == null) { + null + } else { + input.toString + }, StringType, inputSchemas = Seq.fill(1)(None), - name = Some(name)) + name = Some(name)) { + + override def apply(exprs: Column*): Column = { + assert(exprs.length == 1, "Defined UDF only has one column") + val expr = exprs.head.expr + assert(expr.resolved, "column should be resolved to use the same type " + + "as input. Try df(name) or df.col(name)") + Column(Cast(createScalaUDF(Cast(expr, StringType) :: Nil), expr.dataType)) + } + } def apply(exprs: Column*): Column = udf(exprs: _*) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 941ff966cd83b..059dbf892c653 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1001,7 +1001,7 @@ class JoinSuite extends QueryTest with SharedSQLContext { val left = Seq((1, 2), (2, 3)).toDF("a", "b") val right = Seq((1, 2), (3, 4)).toDF("c", "d") - val df = left.join(right, pythonTestUDF($"a") === pythonTestUDF($"c")) + val df = left.join(right, pythonTestUDF(left("a")) === pythonTestUDF(right.col("c"))) val joinNode = df.queryExecution.executedPlan.find(_.isInstanceOf[BroadcastHashJoinExec]) assert(joinNode.isDefined) @@ -1025,7 +1025,7 @@ class JoinSuite extends QueryTest with SharedSQLContext { val left = Seq((1, 2), (2, 3)).toDF("a", "b") val right = Seq((1, 2), (3, 4)).toDF("c", "d") - val df = left.crossJoin(right).where(pythonTestUDF($"a") === pythonTestUDF($"c")) + val df = left.crossJoin(right).where(pythonTestUDF(left("a")) === right.col("c")) // Before optimization, there is a logical Filter operator. val filterInAnalysis = df.queryExecution.analyzed.find(_.isInstanceOf[Filter])