Skip to content

Commit

Permalink
[SPARK-34518][SQL] Rename AlterTableRecoverPartitionsCommand to `Re…
Browse files Browse the repository at this point in the history
…pairTableCommand`

### What changes were proposed in this pull request?
Rename the execution node `AlterTableRecoverPartitionsCommand` for the commands:
- `MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS]`
- `ALTER TABLE table RECOVER PARTITIONS`

to `RepairTableCommand`.

### Why are the changes needed?
1. After the PR #31499, `ALTER TABLE table RECOVER PARTITIONS` is equal to `MSCK REPAIR TABLE table ADD PARTITIONS`. And mapping of the generic command `MSCK REPAIR TABLE` to the more specific execution node `AlterTableRecoverPartitionsCommand` can confuse devs in the future.
2. `ALTER TABLE table RECOVER PARTITIONS` does not support any options/extensions. So, additional parameters `enableAddPartitions` and `enableDropPartitions` in `AlterTableRecoverPartitionsCommand` confuse as well.

### Does this PR introduce _any_ user-facing change?
No because this is internal API.

### How was this patch tested?
By running the existing test suites:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite"
$ build/sbt "test:testOnly *AlterTableRecoverPartitionsParserSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableSuite"
$ build/sbt "test:testOnly *MsckRepairTableParserSuite"
```

Closes #31635 from MaxGekk/rename-recover-partitions.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
MaxGekk authored and cloud-fan committed Feb 25, 2021
1 parent 8a1e172 commit c56af69
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns)

case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) =>
AlterTableRecoverPartitionsCommand(
ident.asTableIdentifier,
addPartitions,
dropPartitions,
"MSCK REPAIR TABLE")
RepairTableCommand(ident.asTableIdentifier, addPartitions, dropPartitions)

case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) =>
LoadDataCommand(
Expand Down Expand Up @@ -422,7 +418,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
ShowColumnsCommand(db, v1TableName, output)

case RecoverPartitions(ResolvedV1TableIdentifier(ident)) =>
AlterTableRecoverPartitionsCommand(
RepairTableCommand(
ident.asTableIdentifier,
enableAddPartitions = true,
enableDropPartitions = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ case class CreateDataSourceTableAsSelectCommand(
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
// Need to recover partitions into the metastore so our saved data is visible.
sessionState.executePlan(AlterTableRecoverPartitionsCommand(
sessionState.executePlan(RepairTableCommand(
table.identifier,
enableAddPartitions = true,
enableDropPartitions = false)).toRdd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ case class AlterTableDropPartitionCommand(
case class PartitionStatistics(numFiles: Int, totalSize: Long)

/**
* Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
* Repair a table by recovering all the partition in the directory of the table and
* update the catalog.
*
* The syntax of this command is:
Expand All @@ -600,11 +600,11 @@ case class PartitionStatistics(numFiles: Int, totalSize: Long)
* MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS];
* }}}
*/
case class AlterTableRecoverPartitionsCommand(
case class RepairTableCommand(
tableName: TableIdentifier,
enableAddPartitions: Boolean,
enableDropPartitions: Boolean,
cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
cmd: String = "MSCK REPAIR TABLE") extends RunnableCommand {

// These are list of statistics that can be collected quickly without requiring a scan of the data
// see https://github.com/apache/hive/blob/master/
Expand Down Expand Up @@ -654,7 +654,7 @@ case class AlterTableRecoverPartitionsCommand(
val threshold = spark.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD)
val pathFilter = getPathFilter(hadoopConf)

val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
val evalPool = ThreadUtils.newForkJoinPool("RepairTableCommand", 8)
val partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)] =
try {
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
Expand Down Expand Up @@ -804,7 +804,7 @@ case class AlterTableRecoverPartitionsCommand(
private def dropPartitions(catalog: SessionCatalog, fs: FileSystem): Int = {
val dropPartSpecs = ThreadUtils.parmap(
catalog.listPartitions(tableName),
"AlterTableRecoverPartitionsCommand: non-existing partitions",
"RepairTableCommand: non-existing partitions",
maxThreads = 8) { partition =>
partition.storage.locationUri.flatMap { uri =>
if (fs.exists(new Path(uri))) None else Some(partition.spec)
Expand Down

0 comments on commit c56af69

Please sign in to comment.