diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 81cd65c3cc337..26b1994308f5d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -54,6 +54,14 @@ private[spark] object HiveUtils extends Logging { /** The version of hive used internally by Spark SQL. */ val hiveExecutionVersion: String = "1.2.1" + /** + * The property key that is used to store the raw hive type string in the metadata of StructField. + * For example, in the case where the Hive type is varchar, the type gets mapped to a string type + * in Spark SQL, but we need to preserve the original type in order to invoke the correct object + * inspector in Hive. + */ + val hiveTypeString: String = "HIVE_TYPE_STRING" + val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + s"0.12.0 through $hiveExecutionVersion.") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index da809cf991de2..3bbac05a79c23 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -61,7 +61,12 @@ private[hive] case class MetastoreRelation( override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil private def toHiveColumn(c: StructField): FieldSchema = { - new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull) + val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { + c.metadata.getString(HiveUtils.hiveTypeString) + } else { + c.dataType.catalogString + } + new FieldSchema(c.name, typeString, c.getComment.orNull) } // TODO: merge this with HiveClientImpl#toHiveTable diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 68dcfd86731bd..590029a517e09 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} import org.apache.spark.util.{CircularBuffer, Utils} /** @@ -748,7 +749,12 @@ private[hive] class HiveClientImpl( .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] private def toHiveColumn(c: StructField): FieldSchema = { - new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull) + val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { + c.metadata.getString(HiveUtils.hiveTypeString) + } else { + c.dataType.catalogString + } + new FieldSchema(c.name, typeString, c.getComment().orNull) } private def fromHiveColumn(hc: FieldSchema): StructField = { @@ -758,10 +764,13 @@ private[hive] class HiveClientImpl( case e: ParseException => throw new SparkException("Cannot recognize hive type string: " + hc.getType, e) } + + val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build() val field = StructField( name = hc.getName, dataType = columnType, - nullable = true) + nullable = true, + metadata = metadata) Option(hc.getComment).map(field.withComment).getOrElse(field) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala index cca4480c44150..c5753cec80da7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala @@ -205,7 +205,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest test("make sure we can read table created by old version of Spark") { for ((tbl, expectedSchema) <- rawTablesAndExpectations) { val readBack = getTableMetadata(tbl.identifier.table) - assert(readBack.schema == expectedSchema) + assert(readBack.schema.sameType(expectedSchema)) if (tbl.tableType == CatalogTableType.EXTERNAL) { // trim the URI prefix @@ -235,7 +235,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName") val readBack = getTableMetadata(newName) - assert(readBack.schema == expectedSchema) + assert(readBack.schema.sameType(expectedSchema)) // trim the URI prefix val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 12f948041a8ab..2b404690510cd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -150,6 +151,17 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE") } + + test("SPARK-18220: read Hive orc table with varchar column") { + val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + try { + hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc") + hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t") + checkAnswer(spark.table("orc_varchar"), Row("a")) + } finally { + hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar") + } + } } class OrcSourceSuite extends OrcSuite {