diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md index 3b39a32d43240..14773ca4ec529 100644 --- a/docs/sql-data-sources-hive-tables.md +++ b/docs/sql-data-sources-hive-tables.md @@ -115,7 +115,7 @@ The following options can be used to configure the version of Hive that is used
1.2.1
0.12.0
through 2.3.4
.
+ options are 0.12.0
through 2.3.4
and 3.1.0
through 3.1.1
.
0.12.0
through 2.3.4
.")
+ "0.12.0
through 2.3.4
and " +
+ "3.1.0
through 3.1.1
.")
.stringConf
.createWithDefault(builtinHiveVersion)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 5e9b324a168e0..bfe19c26ef4e7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -105,6 +105,7 @@ private[hive] class HiveClientImpl(
case hive.v2_1 => new Shim_v2_1()
case hive.v2_2 => new Shim_v2_2()
case hive.v2_3 => new Shim_v2_3()
+ case hive.v3_1 => new Shim_v3_1()
}
// Create an internal session state for this HiveClientImpl.
@@ -852,11 +853,17 @@ private[hive] class HiveClientImpl(
client.getAllTables("default").asScala.foreach { t =>
logDebug(s"Deleting table $t")
val table = client.getTable("default", t)
- client.getIndexes("default", t, 255).asScala.foreach { index =>
- shim.dropIndex(client, "default", t, index.getIndexName)
- }
- if (!table.isIndexTable) {
- client.dropTable("default", t)
+ try {
+ client.getIndexes("default", t, 255).asScala.foreach { index =>
+ shim.dropIndex(client, "default", t, index.getIndexName)
+ }
+ if (!table.isIndexTable) {
+ client.dropTable("default", t)
+ }
+ } catch {
+ case _: NoSuchMethodError =>
+ // HIVE-18448 Hive 3.0 remove index APIs
+ client.dropTable("default", t)
}
}
client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 4d484904d2c27..a8ebb23f78e23 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType}
@@ -1179,3 +1180,128 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
private[client] class Shim_v2_2 extends Shim_v2_1
private[client] class Shim_v2_3 extends Shim_v2_1
+
+private[client] class Shim_v3_1 extends Shim_v2_3 {
+ // Spark supports only non-ACID operations
+ protected lazy val isAcidIUDoperation = JBoolean.FALSE
+
+ // Writer ID can be 0 for non-ACID operations
+ protected lazy val writeIdInLoadTableOrPartition: JLong = 0L
+
+ // Statement ID
+ protected lazy val stmtIdInLoadTableOrPartition: JInteger = 0
+
+ protected lazy val listBucketingLevel: JInteger = 0
+
+ private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass(
+ "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType")
+
+ private lazy val loadPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "loadPartition",
+ classOf[Path],
+ classOf[Table],
+ classOf[JMap[String, String]],
+ clazzLoadFileType,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ classOf[JLong],
+ JInteger.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadTableMethod =
+ findMethod(
+ classOf[Hive],
+ "loadTable",
+ classOf[Path],
+ classOf[String],
+ clazzLoadFileType,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ classOf[JLong],
+ JInteger.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadDynamicPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "loadDynamicPartitions",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ clazzLoadFileType,
+ JInteger.TYPE,
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ JLong.TYPE,
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ classOf[AcidUtils.Operation],
+ JBoolean.TYPE)
+
+ override def loadPartition(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ val session = SparkSession.getActiveSession
+ assert(session.nonEmpty)
+ val database = session.get.sessionState.catalog.getCurrentDatabase
+ val table = hive.getTable(database, tableName)
+ val loadFileType = if (replace) {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
+ } else {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
+ }
+ assert(loadFileType.isDefined)
+ loadPartitionMethod.invoke(hive, loadPath, table, partSpec, loadFileType.get,
+ inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+ isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask,
+ writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean)
+ }
+
+ override def loadTable(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ replace: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ val loadFileType = if (replace) {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
+ } else {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
+ }
+ assert(loadFileType.isDefined)
+ loadTableMethod.invoke(hive, loadPath, tableName, loadFileType.get, isSrcLocal: JBoolean,
+ isSkewedStoreAsSubdir, isAcidIUDoperation, hasFollowingStatsTask,
+ writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition: JInteger, replace: JBoolean)
+ }
+
+ override def loadDynamicPartitions(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ numDP: Int,
+ listBucketingEnabled: Boolean): Unit = {
+ val loadFileType = if (replace) {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
+ } else {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
+ }
+ assert(loadFileType.isDefined)
+ loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, loadFileType.get,
+ numDP: JInteger, listBucketingLevel, isAcid, writeIdInLoadTableOrPartition,
+ stmtIdInLoadTableOrPartition, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID,
+ replace: JBoolean)
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index ca98c30add168..1f7ab9bb60346 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -100,6 +100,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
case "2.2" | "2.2.0" => hive.v2_2
case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" | "2.3.3" | "2.3.4" => hive.v2_3
+ case "3.1" | "3.1.0" | "3.1.1" => hive.v3_1
case version =>
throw new UnsupportedOperationException(s"Unsupported Hive Metastore version ($version). " +
s"Please set ${HiveUtils.HIVE_METASTORE_VERSION.key} with a valid version.")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index e4cf7299d2af6..b6a49497e4809 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -79,7 +79,14 @@ package object client {
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))
- val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
+ // Since Hive 3.0, HookUtils uses org.apache.logging.log4j.util.Strings
+ case object v3_1 extends HiveVersion("3.1.1",
+ extraDeps = Seq("org.apache.logging.log4j:log4j-api:2.10.0",
+ "org.apache.derby:derby:10.14.1.0"),
+ exclusions = Seq("org.apache.curator:*",
+ "org.pentaho:pentaho-aggdesigner-algorithm"))
+
+ val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1)
}
// scalastyle:on
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index 078968ed0145f..4ddba500cdf67 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -114,7 +114,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
- Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
+ Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1)
// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index 7a325bf26b4cf..f3d8c2ad440ff 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -19,12 +19,16 @@ package org.apache.spark.sql.hive.client
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+import org.apache.hadoop.mapred.TextInputFormat
import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType}
+import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType, StructType}
// TODO: Refactor this to `HivePartitionFilteringSuite`
class HiveClientSuite(version: String)
@@ -46,7 +50,22 @@ class HiveClientSuite(version: String)
val hadoopConf = new Configuration()
hadoopConf.setBoolean(tryDirectSqlKey, tryDirectSql)
val client = buildClient(hadoopConf)
- client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (ds INT, h INT, chunk STRING)")
+ val tableSchema =
+ new StructType().add("value", "int").add("ds", "int").add("h", "int").add("chunk", "string")
+ val table = CatalogTable(
+ identifier = TableIdentifier("test", Some("default")),
+ tableType = CatalogTableType.MANAGED,
+ schema = tableSchema,
+ partitionColumnNames = Seq("ds", "h", "chunk"),
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = Some(classOf[TextInputFormat].getName),
+ outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName),
+ serde = Some(classOf[LazySimpleSerDe].getName()),
+ compressed = false,
+ properties = Map.empty
+ ))
+ client.createTable(table, ignoreIfExists = false)
val partitions =
for {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
index 30592a3f85428..9b9af79354c2a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
@@ -23,5 +23,5 @@ import org.apache.spark.SparkFunSuite
private[client] trait HiveClientVersions {
protected val versions =
- IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
+ IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
index e5963d03f6b52..a45ad1f30042b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
@@ -34,10 +34,15 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
- if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
+ if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
+ version == "3.1") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
+ // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
+ if (version == "3.1") {
+ hadoopConf.set("hive.in.test", "true")
+ }
HiveClientBuilder.buildClient(
version,
hadoopConf,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 218bd18e5dc99..b323871630d61 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -103,7 +103,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
private val versions =
- Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
+ Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1")
private var client: HiveClient = null
@@ -118,10 +118,15 @@ class VersionsSuite extends SparkFunSuite with Logging {
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
- if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
+ if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
+ version == "3.1") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
+ // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
+ if (version == "3.1") {
+ hadoopConf.set("hive.in.test", "true")
+ }
client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
if (versionSpark != null) versionSpark.reset()
versionSpark = TestHiveVersion(client)
@@ -318,7 +323,20 @@ class VersionsSuite extends SparkFunSuite with Logging {
properties = Map.empty)
test(s"$version: sql create partitioned table") {
- client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)")
+ val table = CatalogTable(
+ identifier = TableIdentifier("src_part", Some("default")),
+ tableType = CatalogTableType.MANAGED,
+ schema = new StructType().add("value", "int").add("key1", "int").add("key2", "int"),
+ partitionColumnNames = Seq("key1", "key2"),
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = Some(classOf[TextInputFormat].getName),
+ outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName),
+ serde = Some(classOf[LazySimpleSerDe].getName()),
+ compressed = false,
+ properties = Map.empty
+ ))
+ client.createTable(table, ignoreIfExists = false)
}
val testPartitionCount = 2
@@ -556,9 +574,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
test(s"$version: sql create index and reset") {
- client.runSqlHive("CREATE TABLE indexed_table (key INT)")
- client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
- "as 'COMPACT' WITH DEFERRED REBUILD")
+ // HIVE-18448 Since Hive 3.0, INDEX is not supported.
+ if (version != "3.1") {
+ client.runSqlHive("CREATE TABLE indexed_table (key INT)")
+ client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
+ "as 'COMPACT' WITH DEFERRED REBUILD")
+ }
}
///////////////////////////////////////////////////////////////////////////