diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index dfe33ba8b0502..d56680cbde02a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -22,7 +22,7 @@ import java.util.{ArrayList, Arrays, Properties} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.udf.UDAFPercentile -import org.apache.hadoop.hive.ql.udf.generic.{GenericUDAFAverage, GenericUDF, GenericUDFOPAnd, GenericUDTFExplode} +import org.apache.hadoop.hive.ql.udf.generic._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} @@ -151,210 +151,220 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("UDFIntegerToString") { - val testData = hiveContext.sparkContext.parallelize( - IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF() - testData.registerTempTable("integerTable") - - val udfName = classOf[UDFIntegerToString].getName - sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '$udfName'") - checkAnswer( - sql("SELECT testUDFIntegerToString(i) FROM integerTable"), - Seq(Row("1"), Row("2"))) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFIntegerToString") - - hiveContext.reset() + withTempTable("integerTable") { + val testData = hiveContext.sparkContext.parallelize( + IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF() + testData.registerTempTable("integerTable") + + val udfName = classOf[UDFIntegerToString].getName + sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '$udfName'") + checkAnswer( + sql("SELECT testUDFIntegerToString(i) FROM integerTable"), + Seq(Row("1"), Row("2"))) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFIntegerToString") + } } test("UDFToListString") { - val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() - testData.registerTempTable("inputTable") - - sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") - val errMsg = intercept[AnalysisException] { - sql("SELECT testUDFToListString(s) FROM inputTable") + withTempTable("inputTable") { + val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.registerTempTable("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") + val errMsg = intercept[AnalysisException] { + sql("SELECT testUDFToListString(s) FROM inputTable") + } + assert(errMsg.getMessage contains "List type in java is unsupported because " + + "JVM type erasure makes spark fail to catch a component type in List<>;") + + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") } - assert(errMsg.getMessage contains "List type in java is unsupported because " + - "JVM type erasure makes spark fail to catch a component type in List<>;") - - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") - hiveContext.reset() } test("UDFToListInt") { - val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() - testData.registerTempTable("inputTable") - - sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") - val errMsg = intercept[AnalysisException] { - sql("SELECT testUDFToListInt(s) FROM inputTable") + withTempTable("inputTable") { + val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.registerTempTable("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") + val errMsg = intercept[AnalysisException] { + sql("SELECT testUDFToListInt(s) FROM inputTable") + } + assert(errMsg.getMessage contains "List type in java is unsupported because " + + "JVM type erasure makes spark fail to catch a component type in List<>;") + + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt") } - assert(errMsg.getMessage contains "List type in java is unsupported because " + - "JVM type erasure makes spark fail to catch a component type in List<>;") - - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt") - hiveContext.reset() } test("UDFToStringIntMap") { - val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() - testData.registerTempTable("inputTable") - - sql(s"CREATE TEMPORARY FUNCTION testUDFToStringIntMap " + - s"AS '${classOf[UDFToStringIntMap].getName}'") - val errMsg = intercept[AnalysisException] { - sql("SELECT testUDFToStringIntMap(s) FROM inputTable") + withTempTable("inputTable") { + val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.registerTempTable("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFToStringIntMap " + + s"AS '${classOf[UDFToStringIntMap].getName}'") + val errMsg = intercept[AnalysisException] { + sql("SELECT testUDFToStringIntMap(s) FROM inputTable") + } + assert(errMsg.getMessage contains "Map type in java is unsupported because " + + "JVM type erasure makes spark fail to catch key and value types in Map<>;") + + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToStringIntMap") } - assert(errMsg.getMessage contains "Map type in java is unsupported because " + - "JVM type erasure makes spark fail to catch key and value types in Map<>;") - - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToStringIntMap") - hiveContext.reset() } test("UDFToIntIntMap") { - val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() - testData.registerTempTable("inputTable") - - sql(s"CREATE TEMPORARY FUNCTION testUDFToIntIntMap " + - s"AS '${classOf[UDFToIntIntMap].getName}'") - val errMsg = intercept[AnalysisException] { - sql("SELECT testUDFToIntIntMap(s) FROM inputTable") + withTempTable("inputTable") { + val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.registerTempTable("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFToIntIntMap " + + s"AS '${classOf[UDFToIntIntMap].getName}'") + val errMsg = intercept[AnalysisException] { + sql("SELECT testUDFToIntIntMap(s) FROM inputTable") + } + assert(errMsg.getMessage contains "Map type in java is unsupported because " + + "JVM type erasure makes spark fail to catch key and value types in Map<>;") + + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToIntIntMap") } - assert(errMsg.getMessage contains "Map type in java is unsupported because " + - "JVM type erasure makes spark fail to catch key and value types in Map<>;") - - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToIntIntMap") - hiveContext.reset() } test("UDFListListInt") { - val testData = hiveContext.sparkContext.parallelize( - ListListIntCaseClass(Nil) :: - ListListIntCaseClass(Seq((1, 2, 3))) :: - ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF() - testData.registerTempTable("listListIntTable") - - sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'") - checkAnswer( - sql("SELECT testUDFListListInt(lli) FROM listListIntTable"), - Seq(Row(0), Row(2), Row(13))) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListListInt") - - hiveContext.reset() + withTempTable("listListIntTable") { + val testData = hiveContext.sparkContext.parallelize( + ListListIntCaseClass(Nil) :: + ListListIntCaseClass(Seq((1, 2, 3))) :: + ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF() + testData.registerTempTable("listListIntTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'") + checkAnswer( + sql("SELECT testUDFListListInt(lli) FROM listListIntTable"), + Seq(Row(0), Row(2), Row(13))) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListListInt") + } } test("UDFListString") { - val testData = hiveContext.sparkContext.parallelize( - ListStringCaseClass(Seq("a", "b", "c")) :: - ListStringCaseClass(Seq("d", "e")) :: Nil).toDF() - testData.registerTempTable("listStringTable") - - sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'") - checkAnswer( - sql("SELECT testUDFListString(l) FROM listStringTable"), - Seq(Row("a,b,c"), Row("d,e"))) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListString") - - hiveContext.reset() + withTempTable("listStringTable") { + val testData = hiveContext.sparkContext.parallelize( + ListStringCaseClass(Seq("a", "b", "c")) :: + ListStringCaseClass(Seq("d", "e")) :: Nil).toDF() + testData.registerTempTable("listStringTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'") + checkAnswer( + sql("SELECT testUDFListString(l) FROM listStringTable"), + Seq(Row("a,b,c"), Row("d,e"))) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListString") + } } test("UDFStringString") { - val testData = hiveContext.sparkContext.parallelize( - StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF() - testData.registerTempTable("stringTable") + withTempTable("stringTable") { + val testData = hiveContext.sparkContext.parallelize( + StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF() + testData.registerTempTable("stringTable") - sql(s"CREATE TEMPORARY FUNCTION testStringStringUDF AS '${classOf[UDFStringString].getName}'") - checkAnswer( - sql("SELECT testStringStringUDF(\"hello\", s) FROM stringTable"), - Seq(Row("hello world"), Row("hello goodbye"))) - - checkAnswer( - sql("SELECT testStringStringUDF(\"\", testStringStringUDF(\"hello\", s)) FROM stringTable"), - Seq(Row(" hello world"), Row(" hello goodbye"))) + sql(s"CREATE TEMPORARY FUNCTION testStringStringUDF AS '${classOf[UDFStringString].getName}'") + checkAnswer( + sql("SELECT testStringStringUDF(\"hello\", s) FROM stringTable"), + Seq(Row("hello world"), Row("hello goodbye"))) - sql("DROP TEMPORARY FUNCTION IF EXISTS testStringStringUDF") + checkAnswer( + sql("SELECT testStringStringUDF(\"\", testStringStringUDF(\"hello\", s)) FROM stringTable"), + Seq(Row(" hello world"), Row(" hello goodbye"))) - hiveContext.reset() + sql("DROP TEMPORARY FUNCTION IF EXISTS testStringStringUDF") + } } test("UDFTwoListList") { - val testData = hiveContext.sparkContext.parallelize( - ListListIntCaseClass(Nil) :: - ListListIntCaseClass(Seq((1, 2, 3))) :: - ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: - Nil).toDF() - testData.registerTempTable("TwoListTable") - - sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'") - checkAnswer( - sql("SELECT testUDFTwoListList(lli, lli) FROM TwoListTable"), - Seq(Row("0, 0"), Row("2, 2"), Row("13, 13"))) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList") - - hiveContext.reset() - } + withTempTable("TwoListTable") { + val testData = hiveContext.sparkContext.parallelize( + ListListIntCaseClass(Nil) :: + ListListIntCaseClass(Seq((1, 2, 3))) :: + ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: + Nil).toDF() + testData.registerTempTable("TwoListTable") - test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") { - Seq((1, 2)).toDF("a", "b").registerTempTable("testUDF") - - { - // HiveSimpleUDF sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'") - val message = intercept[AnalysisException] { - sql("SELECT testUDFTwoListList() FROM testUDF") - }.getMessage - assert(message.contains("No handler for Hive udf")) + checkAnswer( + sql("SELECT testUDFTwoListList(lli, lli) FROM TwoListTable"), + Seq(Row("0, 0"), Row("2, 2"), Row("13, 13"))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList") } + } - { - // HiveGenericUDF - sql(s"CREATE TEMPORARY FUNCTION testUDFAnd AS '${classOf[GenericUDFOPAnd].getName}'") - val message = intercept[AnalysisException] { - sql("SELECT testUDFAnd() FROM testUDF") - }.getMessage - assert(message.contains("No handler for Hive udf")) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd") - } - - { - // Hive UDAF - sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'") - val message = intercept[AnalysisException] { - sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b") - }.getMessage - assert(message.contains("No handler for Hive udf")) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile") - } - - { - // AbstractGenericUDAFResolver - sql(s"CREATE TEMPORARY FUNCTION testUDAFAverage AS '${classOf[GenericUDAFAverage].getName}'") - val message = intercept[AnalysisException] { - sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b") - }.getMessage - assert(message.contains("No handler for Hive udf")) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage") - } - - { - // Hive UDTF - sql(s"CREATE TEMPORARY FUNCTION testUDTFExplode AS '${classOf[GenericUDTFExplode].getName}'") - val message = intercept[AnalysisException] { - sql("SELECT testUDTFExplode() FROM testUDF") - }.getMessage - assert(message.contains("No handler for Hive udf")) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode") + test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") { + withTempTable("testUDF") { + Seq((1, 2)).toDF("a", "b").registerTempTable("testUDF") + + { + // HiveSimpleUDF + sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'") + val message = intercept[AnalysisException] { + sql("SELECT testUDFTwoListList() FROM testUDF") + }.getMessage + assert(message.contains("No handler for Hive udf")) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList") + } + + { + // HiveGenericUDF + sql(s"CREATE TEMPORARY FUNCTION testUDFAnd AS '${classOf[GenericUDFOPAnd].getName}'") + val message = intercept[AnalysisException] { + sql("SELECT testUDFAnd() FROM testUDF") + }.getMessage + assert(message.contains("No handler for Hive udf")) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd") + } + + { + // Hive UDAF + sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'") + val message = intercept[AnalysisException] { + sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b") + }.getMessage + assert(message.contains("No handler for Hive udf")) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile") + } + + { + // AbstractGenericUDAFResolver + sql(s"CREATE TEMPORARY" + + s" FUNCTION testUDAFAverage AS '${classOf[GenericUDAFAverage].getName}'") + val message = intercept[AnalysisException] { + sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b") + }.getMessage + assert(message.contains("No handler for Hive udf")) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage") + } + + { + // Hive UDTF + sql(s"CREATE TEMPORARY" + + s" FUNCTION testUDTFExplode AS '${classOf[GenericUDTFExplode].getName}'") + val message = intercept[AnalysisException] { + sql("SELECT testUDTFExplode() FROM testUDF") + }.getMessage + assert(message.contains("No handler for Hive udf")) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode") + } } - - sqlContext.dropTempTable("testUDF") } test("Hive UDF in group by") { - Seq(Tuple1(1451400761)).toDF("test_date").registerTempTable("tab1") - val count = sql("select date(cast(test_date as timestamp))" + - " from tab1 group by date(cast(test_date as timestamp))").count() - assert(count == 1) + withTempTable("tab1") { + Seq(Tuple1(1451400761)).toDF("test_date").registerTempTable("tab1") + sql(s"CREATE TEMPORARY FUNCTION testUDFToDate AS '${classOf[GenericUDFToDate].getName}'") + val count = sql("select testUDFToDate(cast(test_date as timestamp))" + + " from tab1 group by testUDFToDate(cast(test_date as timestamp))").count() + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToDate") + assert(count == 1) + } } test("SPARK-11522 select input_file_name from non-parquet table"){