Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24360][SQL] Support Hive 3.1 metastore #23694

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-data-sources-hive-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ The following options can be used to configure the version of Hive that is used
<td><code>1.2.1</code></td>
<td>
Version of the Hive metastore. Available
options are <code>0.12.0</code> through <code>2.3.4</code>.
options are <code>0.12.0</code> through <code>2.3.4</code> and <code>3.1.0</code> through <code>3.1.1</code>.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ private[spark] object HiveUtils extends Logging {

val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>2.3.4</code>.")
"<code>0.12.0</code> through <code>2.3.4</code> and " +
"<code>3.1.0</code> through <code>3.1.1</code>.")
.stringConf
.createWithDefault(builtinHiveVersion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private[hive] class HiveClientImpl(
case hive.v2_1 => new Shim_v2_1()
case hive.v2_2 => new Shim_v2_2()
case hive.v2_3 => new Shim_v2_3()
case hive.v3_1 => new Shim_v3_1()
}

// Create an internal session state for this HiveClientImpl.
Expand Down Expand Up @@ -852,11 +853,17 @@ private[hive] class HiveClientImpl(
client.getAllTables("default").asScala.foreach { t =>
logDebug(s"Deleting table $t")
val table = client.getTable("default", t)
client.getIndexes("default", t, 255).asScala.foreach { index =>
shim.dropIndex(client, "default", t, index.getIndexName)
}
if (!table.isIndexTable) {
client.dropTable("default", t)
try {
client.getIndexes("default", t, 255).asScala.foreach { index =>
shim.dropIndex(client, "default", t, index.getIndexName)
}
if (!table.isIndexTable) {
client.dropTable("default", t)
}
} catch {
case _: NoSuchMethodError =>
// HIVE-18448 Hive 3.0 remove index APIs
client.dropTable("default", t)
}
}
client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde.serdeConstants

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType}
Expand Down Expand Up @@ -1179,3 +1180,128 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
private[client] class Shim_v2_2 extends Shim_v2_1

private[client] class Shim_v2_3 extends Shim_v2_1

private[client] class Shim_v3_1 extends Shim_v2_3 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine if the signatures are matched and the tests pass. Let me double check them within tomorrow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

// Spark supports only non-ACID operations
protected lazy val isAcidIUDoperation = JBoolean.FALSE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, isn't it isAcid defined at Shim_v0_14? Was wondering why this was separate variable again. Do you see any possibility that this is different specifically for 3.1? Then, it's fine.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historically, isAcidIUDoperation is an evolved one from isAcid.

In Hive code, isAcid was a general ACID operation and isAcidIUDoperation is now used for ACID Insert/Update/Delete operations. Also, they checks isFullAcidTable and use it together like this.

else if(!isAcidIUDoperation && isFullAcidTable) {
    destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
}

And yes for your last question. We don't know the future of Hive. So, for the different parameter name, we had better handle differently. That was my logic.


// Writer ID can be 0 for non-ACID operations
protected lazy val writeIdInLoadTableOrPartition: JLong = 0L

// Statement ID
protected lazy val stmtIdInLoadTableOrPartition: JInteger = 0

protected lazy val listBucketingLevel: JInteger = 0

private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass(
"org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType")

private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
"loadPartition",
classOf[Path],
classOf[Table],
classOf[JMap[String, String]],
clazzLoadFileType,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
classOf[JLong],
JInteger.TYPE,
JBoolean.TYPE)
private lazy val loadTableMethod =
findMethod(
classOf[Hive],
"loadTable",
classOf[Path],
classOf[String],
clazzLoadFileType,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
classOf[JLong],
JInteger.TYPE,
JBoolean.TYPE)
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
"loadDynamicPartitions",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
clazzLoadFileType,
JInteger.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
JLong.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
classOf[AcidUtils.Operation],
JBoolean.TYPE)

override def loadPartition(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked

hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
val session = SparkSession.getActiveSession
assert(session.nonEmpty)
val database = session.get.sessionState.catalog.getCurrentDatabase
val table = hive.getTable(database, tableName)
val loadFileType = if (replace) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked

clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
loadPartitionMethod.invoke(hive, loadPath, table, partSpec, loadFileType.get,
inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask,
writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean)
}

override def loadTable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked

hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
loadTableMethod.invoke(hive, loadPath, tableName, loadFileType.get, isSrcLocal: JBoolean,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked

isSkewedStoreAsSubdir, isAcidIUDoperation, hasFollowingStatsTask,
writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition: JInteger, replace: JBoolean)
}

override def loadDynamicPartitions(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked

hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, loadFileType.get,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked

numDP: JInteger, listBucketingLevel, isAcid, writeIdInLoadTableOrPartition,
stmtIdInLoadTableOrPartition, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID,
replace: JBoolean)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, not a big deal but how about adding dropIndex with, say, unsupported exception?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, but it doesn't help much. As we see here, Hive.getIndexes raises NoSuchMethodError before we call shim.dropIndex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I noticed it. I am leaving this comment orthogonally just as a sanity check.

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
case "2.2" | "2.2.0" => hive.v2_2
case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" | "2.3.3" | "2.3.4" => hive.v2_3
case "3.1" | "3.1.0" | "3.1.1" => hive.v3_1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason we do not support 3.0?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not stable like Hadoop 3.0. Spark skipped Hadoop 3.0 and go with Hadoop 3.1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we face the same issue in 1.0, 2.0, but we still support them. Could we simply support it and let the end-users decide it by themselves?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, hopefully, I can proceed that in another PR.

case version =>
throw new UnsupportedOperationException(s"Unsupported Hive Metastore version ($version). " +
s"Please set ${HiveUtils.HIVE_METASTORE_VERSION.key} with a valid version.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,14 @@ package object client {
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
// Since Hive 3.0, HookUtils uses org.apache.logging.log4j.util.Strings
case object v3_1 extends HiveVersion("3.1.1",
extraDeps = Seq("org.apache.logging.log4j:log4j-api:2.10.0",
"org.apache.derby:derby:10.14.1.0"),
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1)
}
// scalastyle:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1)

// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package org.apache.spark.sql.hive.client

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType}
import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType, StructType}

// TODO: Refactor this to `HivePartitionFilteringSuite`
class HiveClientSuite(version: String)
Expand All @@ -46,7 +50,22 @@ class HiveClientSuite(version: String)
val hadoopConf = new Configuration()
hadoopConf.setBoolean(tryDirectSqlKey, tryDirectSql)
val client = buildClient(hadoopConf)
client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (ds INT, h INT, chunk STRING)")
val tableSchema =
new StructType().add("value", "int").add("ds", "int").add("h", "int").add("chunk", "string")
val table = CatalogTable(
identifier = TableIdentifier("test", Some("default")),
tableType = CatalogTableType.MANAGED,
schema = tableSchema,
partitionColumnNames = Seq("ds", "h", "chunk"),
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = Some(classOf[TextInputFormat].getName),
outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName),
serde = Some(classOf[LazySimpleSerDe].getName()),
compressed = false,
properties = Map.empty
))
client.createTable(table, ignoreIfExists = false)

val partitions =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ import org.apache.spark.SparkFunSuite

private[client] trait HiveClientVersions {
protected val versions =
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1")
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
version == "3.1") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
if (version == "3.1") {
hadoopConf.set("hive.in.test", "true")
}
HiveClientBuilder.buildClient(
version,
hadoopConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
}

private val versions =
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1")

private var client: HiveClient = null

Expand All @@ -118,10 +118,15 @@ class VersionsSuite extends SparkFunSuite with Logging {
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
version == "3.1") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
if (version == "3.1") {
hadoopConf.set("hive.in.test", "true")
}
client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
if (versionSpark != null) versionSpark.reset()
versionSpark = TestHiveVersion(client)
Expand Down Expand Up @@ -318,7 +323,20 @@ class VersionsSuite extends SparkFunSuite with Logging {
properties = Map.empty)

test(s"$version: sql create partitioned table") {
client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)")
val table = CatalogTable(
identifier = TableIdentifier("src_part", Some("default")),
tableType = CatalogTableType.MANAGED,
schema = new StructType().add("value", "int").add("key1", "int").add("key2", "int"),
partitionColumnNames = Seq("key1", "key2"),
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = Some(classOf[TextInputFormat].getName),
outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName),
serde = Some(classOf[LazySimpleSerDe].getName()),
compressed = false,
properties = Map.empty
))
client.createTable(table, ignoreIfExists = false)
}

val testPartitionCount = 2
Expand Down Expand Up @@ -556,9 +574,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
}

test(s"$version: sql create index and reset") {
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
"as 'COMPACT' WITH DEFERRED REBUILD")
// HIVE-18448 Since Hive 3.0, INDEX is not supported.
if (version != "3.1") {
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
"as 'COMPACT' WITH DEFERRED REBUILD")
}
}

///////////////////////////////////////////////////////////////////////////
Expand Down