Skip to content

Commit

Permalink
read Hive orc table with varchar column
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Nov 29, 2016
1 parent d449988 commit 71c9dea
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ private[spark] object HiveUtils extends Logging {
sc
}

/** The version of hive used internally by Spark SQL. */
// The version of hive used internally by Spark SQL.
val hiveExecutionVersion: String = "1.2.1"

// The property key that is used to store the raw hive type string in the metadata of StructField.
val hiveTypeString: String = "HIVE_TYPE_STRING"

val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ private[hive] case class MetastoreRelation(
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil

private def toHiveColumn(c: StructField): FieldSchema = {
new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
} else {
c.dataType.catalogString
}
new FieldSchema(c.name, typeString, c.getComment.orNull)
}

// TODO: merge this with HiveClientImpl#toHiveTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.util.{CircularBuffer, Utils}

/**
Expand Down Expand Up @@ -748,7 +749,12 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]

private def toHiveColumn(c: StructField): FieldSchema = {
new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
} else {
c.dataType.catalogString
}
new FieldSchema(c.name, typeString, c.getComment().orNull)
}

private def fromHiveColumn(hc: FieldSchema): StructField = {
Expand All @@ -758,10 +764,13 @@ private[hive] class HiveClientImpl(
case e: ParseException =>
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}

val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
val field = StructField(
name = hc.getName,
dataType = columnType,
nullable = true)
nullable = true,
metadata = metadata)
Option(hc.getComment).map(field.withComment).getOrElse(field)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -150,6 +151,17 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
}

test("SPARK-18220: read Hive orc table with varchar column") {
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
try {
hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
checkAnswer(spark.table("orc_varchar"), Row("a"))
} finally {
hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
}
}
}

class OrcSourceSuite extends OrcSuite {
Expand Down

0 comments on commit 71c9dea

Please sign in to comment.