Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17324] [SQL] Remove Direct Usage of HiveClient in InsertIntoHiveTable #14888

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ abstract class ExternalCatalog {
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit

def loadDynamicPartitions(
db: String,
table: String,
loadPath: String,
partition: TablePartitionSpec,
replace: Boolean,
numDP: Int,
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit

// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -153,6 +163,14 @@ abstract class ExternalCatalog {

def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition

/**
* Returns the specified partition or None if it does not exist.
*/
def getPartitionOption(
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition]

/**
* List the metadata of all partitions that belong to the specified table, assuming it exists.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,18 @@ class InMemoryCatalog(
throw new UnsupportedOperationException("loadPartition is not implemented.")
}

override def loadDynamicPartitions(
db: String,
table: String,
loadPath: String,
partition: TablePartitionSpec,
replace: Boolean,
numDP: Int,
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit = {
throw new UnsupportedOperationException("loadDynamicPartitions is not implemented.")
}

// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -456,6 +468,17 @@ class InMemoryCatalog(
catalog(db).tables(table).partitions(spec)
}

override def getPartitionOption(
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = synchronized {
if (!partitionExists(db, table, spec)) {
None
} else {
Option(catalog(db).tables(table).partitions(spec))
}
}

override def listPartitions(
db: String,
table: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
isSkewedStoreAsSubdir)
}

override def loadDynamicPartitions(
db: String,
table: String,
loadPath: String,
partition: TablePartitionSpec,
replace: Boolean,
numDP: Int,
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit = withClient {
requireTableExists(db, table)

val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
getTable(db, table).partitionColumnNames.foreach { colName =>
orderedPartitionSpec.put(colName, partition(colName))
}

client.loadDynamicPartitions(
loadPath,
s"$db.$table",
orderedPartitionSpec,
replace,
numDP,
holdDDLTime,
listBucketingEnabled)
}

// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -548,6 +574,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.getPartition(db, table, spec)
}

/**
* Returns the specified partition or None if it does not exist.
*/
override def getPartitionOption(
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
client.getPartitionOption(db, table, spec)
}

/**
* Returns the partition names from hive metastore for a given table in a database.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ case class InsertIntoHiveTable(
ifNotExists: Boolean) extends UnaryExecNode {

@transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
@transient private val client = sessionState.metadataHive
@transient private val externalCatalog = sqlContext.sharedState.externalCatalog

def output: Seq[Attribute] = Seq.empty

Expand Down Expand Up @@ -240,54 +240,49 @@ case class InsertIntoHiveTable(
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
val holdDDLTime = false
if (partition.nonEmpty) {

// loadPartition call orders directories created on the iteration order of the this map
val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
table.hiveQlTable.getPartCols.asScala.foreach { entry =>
orderedPartitionSpec.put(entry.getName, partitionSpec.getOrElse(entry.getName, ""))
}

// inheritTableSpecs is set to true. It should be set to false for an IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
if (numDynamicPartitions > 0) {
client.synchronized {
client.loadDynamicPartitions(
outputPath.toString,
table.catalogTable.qualifiedName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir)
}
externalCatalog.loadDynamicPartitions(
db = table.catalogTable.database,
table = table.catalogTable.identifier.table,
outputPath.toString,
partitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime = holdDDLTime,
listBucketingEnabled = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is isSkewedStoreAsSubdir parameter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last parameter of loadDynamicPartitions is listBucketingEnabled. loadDynamicPartitions does not have a parameter for isSkewedStoreAsSubdir. Here, we fix a bug. : )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's listBucketingEnabled used for? Will we need it in the future? If so, we also need to add a TODO here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will answer the difference between listBucketingEnabled and isSkewedStoreAsSubdir tomorrrow. Need more time to do the investigation. : )

Copy link
Member Author

@gatorsmile gatorsmile Sep 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh... isSkewedStoreAsSubdir and listBucketingEnabled are the same!!! loadDynamicPartitions is using listBucketingEnabled, but loadPartition is using isSkewedStoreAsSubdir.

loadDynamicPartitions eventually passes the value of listBucketingEnabled to isSkewedStoreAsSubdir when calling loadPartition.

To differ list bucketing tables from normal skewed tables, they use an optional parameter store as DIRECTORIES. See the JIRA HIVE-3649.

Actually, we can remove TODO for either isSkewedStoreAsSubdir or loadDynamicPartitions, because we always ignore this feature in getTableOption. We can set it in HiveClientImpl, and then we do not need to pollute the externalCatalog APIs

} else {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO is removed. We set it in HiveClientImpl.
cc @cloud-fan @yhuai

// scalastyle:off
// ifNotExists is only valid with static partition, refer to
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
// scalastyle:on
val oldPart =
client.getPartitionOption(
table.catalogTable,
externalCatalog.getPartitionOption(
table.catalogTable.database,
table.catalogTable.identifier.table,
partitionSpec)

if (oldPart.isEmpty || !ifNotExists) {
client.loadPartition(
outputPath.toString,
table.catalogTable.qualifiedName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
// inheritTableSpecs is set to true. It should be set to false for an IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
externalCatalog.loadPartition(
table.catalogTable.database,
table.catalogTable.identifier.table,
outputPath.toString,
partitionSpec,
isOverwrite = overwrite,
holdDDLTime = holdDDLTime,
inheritTableSpecs = inheritTableSpecs,
isSkewedStoreAsSubdir = isSkewedStoreAsSubdir)
}
}
} else {
client.loadTable(
externalCatalog.loadTable(
table.catalogTable.database,
table.catalogTable.identifier.table,
outputPath.toString, // TODO: URI
table.catalogTable.qualifiedName,
overwrite,
holdDDLTime)
}
Expand Down