Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18220][SQL] read Hive orc table with varchar column should not fail #16060

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ private[spark] object HiveUtils extends Logging {
/** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "1.2.1"

/**
* The property key that is used to store the raw hive type string in the metadata of StructField.
* For example, in the case where the Hive type is varchar, the type gets mapped to a string type
* in Spark SQL, but we need to preserve the original type in order to invoke the correct object
* inspector in Hive.
*/
val hiveTypeString: String = "HIVE_TYPE_STRING"

val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ private[hive] case class MetastoreRelation(
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil

private def toHiveColumn(c: StructField): FieldSchema = {
new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
} else {
c.dataType.catalogString
}
new FieldSchema(c.name, typeString, c.getComment.orNull)
}

// TODO: merge this with HiveClientImpl#toHiveTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.util.{CircularBuffer, Utils}

/**
Expand Down Expand Up @@ -748,7 +749,12 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]

private def toHiveColumn(c: StructField): FieldSchema = {
new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
} else {
c.dataType.catalogString
}
new FieldSchema(c.name, typeString, c.getComment().orNull)
}

private def fromHiveColumn(hc: FieldSchema): StructField = {
Expand All @@ -758,10 +764,13 @@ private[hive] class HiveClientImpl(
case e: ParseException =>
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}

val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
val field = StructField(
name = hc.getName,
dataType = columnType,
nullable = true)
nullable = true,
metadata = metadata)
Option(hc.getComment).map(field.withComment).getOrElse(field)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
test("make sure we can read table created by old version of Spark") {
for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
val readBack = getTableMetadata(tbl.identifier.table)
assert(readBack.schema == expectedSchema)
assert(readBack.schema.sameType(expectedSchema))

if (tbl.tableType == CatalogTableType.EXTERNAL) {
// trim the URI prefix
Expand Down Expand Up @@ -235,7 +235,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName")

val readBack = getTableMetadata(newName)
assert(readBack.schema == expectedSchema)
assert(readBack.schema.sameType(expectedSchema))

// trim the URI prefix
val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -150,6 +151,17 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
}

test("SPARK-18220: read Hive orc table with varchar column") {
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
try {
hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
checkAnswer(spark.table("orc_varchar"), Row("a"))
} finally {
hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
}
}
}

class OrcSourceSuite extends OrcSuite {
Expand Down