Skip to content

Commit

Permalink
SPARK-12558][FOLLOW-UP] AnalysisException when multiple functions app…
Browse files Browse the repository at this point in the history
…lied in GROUP BY clause
  • Loading branch information
dilipbiswal committed Jan 14, 2016
1 parent 902667f commit e21b5aa
Showing 1 changed file with 175 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.{ArrayList, Arrays, Properties}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.udf.UDAFPercentile
import org.apache.hadoop.hive.ql.udf.generic.{GenericUDAFAverage, GenericUDF, GenericUDFOPAnd, GenericUDTFExplode}
import org.apache.hadoop.hive.ql.udf.generic._
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject
import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory}
Expand Down Expand Up @@ -151,210 +151,220 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
}

test("UDFIntegerToString") {
val testData = hiveContext.sparkContext.parallelize(
IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF()
testData.registerTempTable("integerTable")

val udfName = classOf[UDFIntegerToString].getName
sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '$udfName'")
checkAnswer(
sql("SELECT testUDFIntegerToString(i) FROM integerTable"),
Seq(Row("1"), Row("2")))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFIntegerToString")

hiveContext.reset()
withTempTable("integerTable") {
val testData = hiveContext.sparkContext.parallelize(
IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF()
testData.registerTempTable("integerTable")

val udfName = classOf[UDFIntegerToString].getName
sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '$udfName'")
checkAnswer(
sql("SELECT testUDFIntegerToString(i) FROM integerTable"),
Seq(Row("1"), Row("2")))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFIntegerToString")
}
}

test("UDFToListString") {
val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
testData.registerTempTable("inputTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'")
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToListString(s) FROM inputTable")
withTempTable("inputTable") {
val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
testData.registerTempTable("inputTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'")
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToListString(s) FROM inputTable")
}
assert(errMsg.getMessage contains "List type in java is unsupported because " +
"JVM type erasure makes spark fail to catch a component type in List<>;")

sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString")
}
assert(errMsg.getMessage contains "List type in java is unsupported because " +
"JVM type erasure makes spark fail to catch a component type in List<>;")

sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString")
hiveContext.reset()
}

test("UDFToListInt") {
val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
testData.registerTempTable("inputTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'")
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToListInt(s) FROM inputTable")
withTempTable("inputTable") {
val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
testData.registerTempTable("inputTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'")
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToListInt(s) FROM inputTable")
}
assert(errMsg.getMessage contains "List type in java is unsupported because " +
"JVM type erasure makes spark fail to catch a component type in List<>;")

sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt")
}
assert(errMsg.getMessage contains "List type in java is unsupported because " +
"JVM type erasure makes spark fail to catch a component type in List<>;")

sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt")
hiveContext.reset()
}

test("UDFToStringIntMap") {
val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
testData.registerTempTable("inputTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFToStringIntMap " +
s"AS '${classOf[UDFToStringIntMap].getName}'")
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToStringIntMap(s) FROM inputTable")
withTempTable("inputTable") {
val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
testData.registerTempTable("inputTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFToStringIntMap " +
s"AS '${classOf[UDFToStringIntMap].getName}'")
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToStringIntMap(s) FROM inputTable")
}
assert(errMsg.getMessage contains "Map type in java is unsupported because " +
"JVM type erasure makes spark fail to catch key and value types in Map<>;")

sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToStringIntMap")
}
assert(errMsg.getMessage contains "Map type in java is unsupported because " +
"JVM type erasure makes spark fail to catch key and value types in Map<>;")

sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToStringIntMap")
hiveContext.reset()
}

test("UDFToIntIntMap") {
val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
testData.registerTempTable("inputTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFToIntIntMap " +
s"AS '${classOf[UDFToIntIntMap].getName}'")
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToIntIntMap(s) FROM inputTable")
withTempTable("inputTable") {
val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
testData.registerTempTable("inputTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFToIntIntMap " +
s"AS '${classOf[UDFToIntIntMap].getName}'")
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToIntIntMap(s) FROM inputTable")
}
assert(errMsg.getMessage contains "Map type in java is unsupported because " +
"JVM type erasure makes spark fail to catch key and value types in Map<>;")

sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToIntIntMap")
}
assert(errMsg.getMessage contains "Map type in java is unsupported because " +
"JVM type erasure makes spark fail to catch key and value types in Map<>;")

sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToIntIntMap")
hiveContext.reset()
}

test("UDFListListInt") {
val testData = hiveContext.sparkContext.parallelize(
ListListIntCaseClass(Nil) ::
ListListIntCaseClass(Seq((1, 2, 3))) ::
ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF()
testData.registerTempTable("listListIntTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'")
checkAnswer(
sql("SELECT testUDFListListInt(lli) FROM listListIntTable"),
Seq(Row(0), Row(2), Row(13)))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListListInt")

hiveContext.reset()
withTempTable("listListIntTable") {
val testData = hiveContext.sparkContext.parallelize(
ListListIntCaseClass(Nil) ::
ListListIntCaseClass(Seq((1, 2, 3))) ::
ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF()
testData.registerTempTable("listListIntTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'")
checkAnswer(
sql("SELECT testUDFListListInt(lli) FROM listListIntTable"),
Seq(Row(0), Row(2), Row(13)))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListListInt")
}
}

test("UDFListString") {
val testData = hiveContext.sparkContext.parallelize(
ListStringCaseClass(Seq("a", "b", "c")) ::
ListStringCaseClass(Seq("d", "e")) :: Nil).toDF()
testData.registerTempTable("listStringTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'")
checkAnswer(
sql("SELECT testUDFListString(l) FROM listStringTable"),
Seq(Row("a,b,c"), Row("d,e")))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListString")

hiveContext.reset()
withTempTable("listStringTable") {
val testData = hiveContext.sparkContext.parallelize(
ListStringCaseClass(Seq("a", "b", "c")) ::
ListStringCaseClass(Seq("d", "e")) :: Nil).toDF()
testData.registerTempTable("listStringTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'")
checkAnswer(
sql("SELECT testUDFListString(l) FROM listStringTable"),
Seq(Row("a,b,c"), Row("d,e")))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListString")
}
}

test("UDFStringString") {
val testData = hiveContext.sparkContext.parallelize(
StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF()
testData.registerTempTable("stringTable")
withTempTable("stringTable") {
val testData = hiveContext.sparkContext.parallelize(
StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF()
testData.registerTempTable("stringTable")

sql(s"CREATE TEMPORARY FUNCTION testStringStringUDF AS '${classOf[UDFStringString].getName}'")
checkAnswer(
sql("SELECT testStringStringUDF(\"hello\", s) FROM stringTable"),
Seq(Row("hello world"), Row("hello goodbye")))

checkAnswer(
sql("SELECT testStringStringUDF(\"\", testStringStringUDF(\"hello\", s)) FROM stringTable"),
Seq(Row(" hello world"), Row(" hello goodbye")))
sql(s"CREATE TEMPORARY FUNCTION testStringStringUDF AS '${classOf[UDFStringString].getName}'")
checkAnswer(
sql("SELECT testStringStringUDF(\"hello\", s) FROM stringTable"),
Seq(Row("hello world"), Row("hello goodbye")))

sql("DROP TEMPORARY FUNCTION IF EXISTS testStringStringUDF")
checkAnswer(
sql("SELECT testStringStringUDF(\"\", testStringStringUDF(\"hello\", s)) FROM stringTable"),
Seq(Row(" hello world"), Row(" hello goodbye")))

hiveContext.reset()
sql("DROP TEMPORARY FUNCTION IF EXISTS testStringStringUDF")
}
}

test("UDFTwoListList") {
val testData = hiveContext.sparkContext.parallelize(
ListListIntCaseClass(Nil) ::
ListListIntCaseClass(Seq((1, 2, 3))) ::
ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) ::
Nil).toDF()
testData.registerTempTable("TwoListTable")

sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")
checkAnswer(
sql("SELECT testUDFTwoListList(lli, lli) FROM TwoListTable"),
Seq(Row("0, 0"), Row("2, 2"), Row("13, 13")))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList")

hiveContext.reset()
}
withTempTable("TwoListTable") {
val testData = hiveContext.sparkContext.parallelize(
ListListIntCaseClass(Nil) ::
ListListIntCaseClass(Seq((1, 2, 3))) ::
ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) ::
Nil).toDF()
testData.registerTempTable("TwoListTable")

test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") {
Seq((1, 2)).toDF("a", "b").registerTempTable("testUDF")

{
// HiveSimpleUDF
sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDFTwoListList() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive udf"))
checkAnswer(
sql("SELECT testUDFTwoListList(lli, lli) FROM TwoListTable"),
Seq(Row("0, 0"), Row("2, 2"), Row("13, 13")))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList")
}
}

{
// HiveGenericUDF
sql(s"CREATE TEMPORARY FUNCTION testUDFAnd AS '${classOf[GenericUDFOPAnd].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDFAnd() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd")
}

{
// Hive UDAF
sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile")
}

{
// AbstractGenericUDAFResolver
sql(s"CREATE TEMPORARY FUNCTION testUDAFAverage AS '${classOf[GenericUDAFAverage].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage")
}

{
// Hive UDTF
sql(s"CREATE TEMPORARY FUNCTION testUDTFExplode AS '${classOf[GenericUDTFExplode].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDTFExplode() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode")
test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") {
withTempTable("testUDF") {
Seq((1, 2)).toDF("a", "b").registerTempTable("testUDF")

{
// HiveSimpleUDF
sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDFTwoListList() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList")
}

{
// HiveGenericUDF
sql(s"CREATE TEMPORARY FUNCTION testUDFAnd AS '${classOf[GenericUDFOPAnd].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDFAnd() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd")
}

{
// Hive UDAF
sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile")
}

{
// AbstractGenericUDAFResolver
sql(s"CREATE TEMPORARY" +
s" FUNCTION testUDAFAverage AS '${classOf[GenericUDAFAverage].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage")
}

{
// Hive UDTF
sql(s"CREATE TEMPORARY" +
s" FUNCTION testUDTFExplode AS '${classOf[GenericUDTFExplode].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDTFExplode() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode")
}
}

sqlContext.dropTempTable("testUDF")
}

test("Hive UDF in group by") {
Seq(Tuple1(1451400761)).toDF("test_date").registerTempTable("tab1")
val count = sql("select date(cast(test_date as timestamp))" +
" from tab1 group by date(cast(test_date as timestamp))").count()
assert(count == 1)
withTempTable("tab1") {
Seq(Tuple1(1451400761)).toDF("test_date").registerTempTable("tab1")
sql(s"CREATE TEMPORARY FUNCTION testUDFToDate AS '${classOf[GenericUDFToDate].getName}'")
val count = sql("select testUDFToDate(cast(test_date as timestamp))" +
" from tab1 group by testUDFToDate(cast(test_date as timestamp))").count()
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToDate")
assert(count == 1)
}
}

test("SPARK-11522 select input_file_name from non-parquet table"){
Expand Down

0 comments on commit e21b5aa

Please sign in to comment.