diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 81cd65c3cc337..26b1994308f5d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -54,6 +54,14 @@ private[spark] object HiveUtils extends Logging {
/** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "1.2.1"
+ /**
+ * The property key that is used to store the raw hive type string in the metadata of StructField.
+ * For example, in the case where the Hive type is varchar, the type gets mapped to a string type
+ * in Spark SQL, but we need to preserve the original type in order to invoke the correct object
+ * inspector in Hive.
+ */
+ val hiveTypeString: String = "HIVE_TYPE_STRING"
+
val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
s"0.12.0
through $hiveExecutionVersion
.")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index da809cf991de2..3bbac05a79c23 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -61,7 +61,12 @@ private[hive] case class MetastoreRelation(
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
private def toHiveColumn(c: StructField): FieldSchema = {
- new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
+ val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
+ c.metadata.getString(HiveUtils.hiveTypeString)
+ } else {
+ c.dataType.catalogString
+ }
+ new FieldSchema(c.name, typeString, c.getComment.orNull)
}
// TODO: merge this with HiveClientImpl#toHiveTable
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 68dcfd86731bd..590029a517e09 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.util.{CircularBuffer, Utils}
/**
@@ -748,7 +749,12 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
private def toHiveColumn(c: StructField): FieldSchema = {
- new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
+ val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
+ c.metadata.getString(HiveUtils.hiveTypeString)
+ } else {
+ c.dataType.catalogString
+ }
+ new FieldSchema(c.name, typeString, c.getComment().orNull)
}
private def fromHiveColumn(hc: FieldSchema): StructField = {
@@ -758,10 +764,13 @@ private[hive] class HiveClientImpl(
case e: ParseException =>
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}
+
+ val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
val field = StructField(
name = hc.getName,
dataType = columnType,
- nullable = true)
+ nullable = true,
+ metadata = metadata)
Option(hc.getComment).map(field.withComment).getOrElse(field)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
index cca4480c44150..c5753cec80da7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
@@ -205,7 +205,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
test("make sure we can read table created by old version of Spark") {
for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
val readBack = getTableMetadata(tbl.identifier.table)
- assert(readBack.schema == expectedSchema)
+ assert(readBack.schema.sameType(expectedSchema))
if (tbl.tableType == CatalogTableType.EXTERNAL) {
// trim the URI prefix
@@ -235,7 +235,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName")
val readBack = getTableMetadata(newName)
- assert(readBack.schema == expectedSchema)
+ assert(readBack.schema.sameType(expectedSchema))
// trim the URI prefix
val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 12f948041a8ab..2b404690510cd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -150,6 +151,17 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
}
+
+ test("SPARK-18220: read Hive orc table with varchar column") {
+ val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ try {
+ hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
+ hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
+ checkAnswer(spark.table("orc_varchar"), Row("a"))
+ } finally {
+ hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
+ }
+ }
}
class OrcSourceSuite extends OrcSuite {