diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md index 3b39a32d43240..14773ca4ec529 100644 --- a/docs/sql-data-sources-hive-tables.md +++ b/docs/sql-data-sources-hive-tables.md @@ -115,7 +115,7 @@ The following options can be used to configure the version of Hive that is used 1.2.1 Version of the Hive metastore. Available - options are 0.12.0 through 2.3.4. + options are 0.12.0 through 2.3.4 and 3.1.0 through 3.1.1. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 597eef129f63e..38bbe643f5fac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -62,7 +62,8 @@ private[spark] object HiveUtils extends Logging { val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + - s"0.12.0 through 2.3.4.") + "0.12.0 through 2.3.4 and " + + "3.1.0 through 3.1.1.") .stringConf .createWithDefault(builtinHiveVersion) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5e9b324a168e0..bfe19c26ef4e7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -105,6 +105,7 @@ private[hive] class HiveClientImpl( case hive.v2_1 => new Shim_v2_1() case hive.v2_2 => new Shim_v2_2() case hive.v2_3 => new Shim_v2_3() + case hive.v3_1 => new Shim_v3_1() } // Create an internal session state for this HiveClientImpl. @@ -852,11 +853,17 @@ private[hive] class HiveClientImpl( client.getAllTables("default").asScala.foreach { t => logDebug(s"Deleting table $t") val table = client.getTable("default", t) - client.getIndexes("default", t, 255).asScala.foreach { index => - shim.dropIndex(client, "default", t, index.getIndexName) - } - if (!table.isIndexTable) { - client.dropTable("default", t) + try { + client.getIndexes("default", t, 255).asScala.foreach { index => + shim.dropIndex(client, "default", t, index.getIndexName) + } + if (!table.isIndexTable) { + client.dropTable("default", t) + } + } catch { + case _: NoSuchMethodError => + // HIVE-18448 Hive 3.0 remove index APIs + client.dropTable("default", t) } } client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 4d484904d2c27..a8ebb23f78e23 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde.serdeConstants import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType} @@ -1179,3 +1180,128 @@ private[client] class Shim_v2_1 extends Shim_v2_0 { private[client] class Shim_v2_2 extends Shim_v2_1 private[client] class Shim_v2_3 extends Shim_v2_1 + +private[client] class Shim_v3_1 extends Shim_v2_3 { + // Spark supports only non-ACID operations + protected lazy val isAcidIUDoperation = JBoolean.FALSE + + // Writer ID can be 0 for non-ACID operations + protected lazy val writeIdInLoadTableOrPartition: JLong = 0L + + // Statement ID + protected lazy val stmtIdInLoadTableOrPartition: JInteger = 0 + + protected lazy val listBucketingLevel: JInteger = 0 + + private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass( + "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType") + + private lazy val loadPartitionMethod = + findMethod( + classOf[Hive], + "loadPartition", + classOf[Path], + classOf[Table], + classOf[JMap[String, String]], + clazzLoadFileType, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + classOf[JLong], + JInteger.TYPE, + JBoolean.TYPE) + private lazy val loadTableMethod = + findMethod( + classOf[Hive], + "loadTable", + classOf[Path], + classOf[String], + clazzLoadFileType, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + classOf[JLong], + JInteger.TYPE, + JBoolean.TYPE) + private lazy val loadDynamicPartitionsMethod = + findMethod( + classOf[Hive], + "loadDynamicPartitions", + classOf[Path], + classOf[String], + classOf[JMap[String, String]], + clazzLoadFileType, + JInteger.TYPE, + JInteger.TYPE, + JBoolean.TYPE, + JLong.TYPE, + JInteger.TYPE, + JBoolean.TYPE, + classOf[AcidUtils.Operation], + JBoolean.TYPE) + + override def loadPartition( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean, + isSrcLocal: Boolean): Unit = { + val session = SparkSession.getActiveSession + assert(session.nonEmpty) + val database = session.get.sessionState.catalog.getCurrentDatabase + val table = hive.getTable(database, tableName) + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + loadPartitionMethod.invoke(hive, loadPath, table, partSpec, loadFileType.get, + inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, + isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask, + writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean) + } + + override def loadTable( + hive: Hive, + loadPath: Path, + tableName: String, + replace: Boolean, + isSrcLocal: Boolean): Unit = { + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + loadTableMethod.invoke(hive, loadPath, tableName, loadFileType.get, isSrcLocal: JBoolean, + isSkewedStoreAsSubdir, isAcidIUDoperation, hasFollowingStatsTask, + writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition: JInteger, replace: JBoolean) + } + + override def loadDynamicPartitions( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + numDP: Int, + listBucketingEnabled: Boolean): Unit = { + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, loadFileType.get, + numDP: JInteger, listBucketingLevel, isAcid, writeIdInLoadTableOrPartition, + stmtIdInLoadTableOrPartition, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID, + replace: JBoolean) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index ca98c30add168..1f7ab9bb60346 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -100,6 +100,7 @@ private[hive] object IsolatedClientLoader extends Logging { case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1 case "2.2" | "2.2.0" => hive.v2_2 case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" | "2.3.3" | "2.3.4" => hive.v2_3 + case "3.1" | "3.1.0" | "3.1.1" => hive.v3_1 case version => throw new UnsupportedOperationException(s"Unsupported Hive Metastore version ($version). " + s"Please set ${HiveUtils.HIVE_METASTORE_VERSION.key} with a valid version.") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index e4cf7299d2af6..b6a49497e4809 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -79,7 +79,14 @@ package object client { exclusions = Seq("org.apache.curator:*", "org.pentaho:pentaho-aggdesigner-algorithm")) - val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3) + // Since Hive 3.0, HookUtils uses org.apache.logging.log4j.util.Strings + case object v3_1 extends HiveVersion("3.1.1", + extraDeps = Seq("org.apache.logging.log4j:log4j-api:2.10.0", + "org.apache.derby:derby:10.14.1.0"), + exclusions = Seq("org.apache.curator:*", + "org.pentaho:pentaho-aggdesigner-algorithm")) + + val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1) } // scalastyle:on diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 078968ed0145f..4ddba500cdf67 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -114,7 +114,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { // be removed by Hive when Hive is trying to empty the table directory. val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = - Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3) + Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1) // Ensure all the supported versions are considered here. assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 7a325bf26b4cf..f3d8c2ad440ff 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -19,12 +19,16 @@ package org.apache.spark.sql.hive.client import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.mapred.TextInputFormat import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType} +import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType, StructType} // TODO: Refactor this to `HivePartitionFilteringSuite` class HiveClientSuite(version: String) @@ -46,7 +50,22 @@ class HiveClientSuite(version: String) val hadoopConf = new Configuration() hadoopConf.setBoolean(tryDirectSqlKey, tryDirectSql) val client = buildClient(hadoopConf) - client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (ds INT, h INT, chunk STRING)") + val tableSchema = + new StructType().add("value", "int").add("ds", "int").add("h", "int").add("chunk", "string") + val table = CatalogTable( + identifier = TableIdentifier("test", Some("default")), + tableType = CatalogTableType.MANAGED, + schema = tableSchema, + partitionColumnNames = Seq("ds", "h", "chunk"), + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = Some(classOf[TextInputFormat].getName), + outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName), + serde = Some(classOf[LazySimpleSerDe].getName()), + compressed = false, + properties = Map.empty + )) + client.createTable(table, ignoreIfExists = false) val partitions = for { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala index 30592a3f85428..9b9af79354c2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala @@ -23,5 +23,5 @@ import org.apache.spark.SparkFunSuite private[client] trait HiveClientVersions { protected val versions = - IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3") + IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala index e5963d03f6b52..a45ad1f30042b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala @@ -34,10 +34,15 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and // hive.metastore.schema.verification from false to true since 2.0 // For details, see the JIRA HIVE-6113 and HIVE-12463 - if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") { + if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" || + version == "3.1") { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } + // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. + if (version == "3.1") { + hadoopConf.set("hive.in.test", "true") + } HiveClientBuilder.buildClient( version, hadoopConf, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 218bd18e5dc99..b323871630d61 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -103,7 +103,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } private val versions = - Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3") + Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1") private var client: HiveClient = null @@ -118,10 +118,15 @@ class VersionsSuite extends SparkFunSuite with Logging { // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and // hive.metastore.schema.verification from false to true since 2.0 // For details, see the JIRA HIVE-6113 and HIVE-12463 - if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") { + if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" || + version == "3.1") { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } + // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. + if (version == "3.1") { + hadoopConf.set("hive.in.test", "true") + } client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf)) if (versionSpark != null) versionSpark.reset() versionSpark = TestHiveVersion(client) @@ -318,7 +323,20 @@ class VersionsSuite extends SparkFunSuite with Logging { properties = Map.empty) test(s"$version: sql create partitioned table") { - client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)") + val table = CatalogTable( + identifier = TableIdentifier("src_part", Some("default")), + tableType = CatalogTableType.MANAGED, + schema = new StructType().add("value", "int").add("key1", "int").add("key2", "int"), + partitionColumnNames = Seq("key1", "key2"), + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = Some(classOf[TextInputFormat].getName), + outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName), + serde = Some(classOf[LazySimpleSerDe].getName()), + compressed = false, + properties = Map.empty + )) + client.createTable(table, ignoreIfExists = false) } val testPartitionCount = 2 @@ -556,9 +574,12 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: sql create index and reset") { - client.runSqlHive("CREATE TABLE indexed_table (key INT)") - client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + - "as 'COMPACT' WITH DEFERRED REBUILD") + // HIVE-18448 Since Hive 3.0, INDEX is not supported. + if (version != "3.1") { + client.runSqlHive("CREATE TABLE indexed_table (key INT)") + client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + + "as 'COMPACT' WITH DEFERRED REBUILD") + } } ///////////////////////////////////////////////////////////////////////////