Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12558][SQL][WIP] AnalysisException when multiple functions applied in GROUP BY clause #10520

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import scala.reflect.ClassTag

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import com.google.common.base.Objects
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils}
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
Expand All @@ -45,6 +47,7 @@ private[hive] object HiveShim {
// scale Hive 0.13 infers for BigDecimals from sources that don't specify them (e.g. UDFs)
val UNLIMITED_DECIMAL_PRECISION = 38
val UNLIMITED_DECIMAL_SCALE = 18
val HIVE_GENERIC_UDF_MACRO_CLS = "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro"

/*
* This function in hive-0.13 become private, but we have to do this to walkaround hive bug
Expand Down Expand Up @@ -123,6 +126,26 @@ private[hive] object HiveShim {
// for Serialization
def this() = this(null)

override def hashCode(): Int = {
if (functionClassName == HIVE_GENERIC_UDF_MACRO_CLS) {
Objects.hashCode(functionClassName, instance.asInstanceOf[GenericUDFMacro].getBody())
} else {
functionClassName.hashCode()
}
}

override def equals(other: Any): Boolean = other match {
case a: HiveFunctionWrapper if functionClassName == a.functionClassName =>
// In case of udf macro, check to make sure they point to the same underlying UDF
if (functionClassName == HIVE_GENERIC_UDF_MACRO_CLS) {
a.instance.asInstanceOf[GenericUDFMacro].getBody() ==
instance.asInstanceOf[GenericUDFMacro].getBody()
} else {
true
}
case _ => false
}

@transient
def deserializeObjectByKryo[T: ClassTag](
kryo: Kryo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
sqlContext.dropTempTable("testUDF")
}

test("Hive UDF in group by") {
Seq(Tuple1(1451400761)).toDF("test_date").registerTempTable("tab1")
val count = sql("select date(cast(test_date as timestamp))" +
" from tab1 group by date(cast(test_date as timestamp))").count()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it will be good to use withTempTable, which will automatically drop the temp table. Also, it will be more robust if we create a temp function based on hive's org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate. So, even if we later have a native function called date, we can still test the hive udf. We can do the test change in a follow-up pr (the pr can re-use the jira number).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realize it after I merge the PR. Let's have a pr to improve the test. Thanks!

assert(count == 1)
}

test("SPARK-11522 select input_file_name from non-parquet table"){

withTempDir { tempDir =>
Expand Down