From c56af69cdf3cc68821e69fa4ef0213b5cc281ab0 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 25 Feb 2021 09:32:41 +0000 Subject: [PATCH] [SPARK-34518][SQL] Rename `AlterTableRecoverPartitionsCommand` to `RepairTableCommand` ### What changes were proposed in this pull request? Rename the execution node `AlterTableRecoverPartitionsCommand` for the commands: - `MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS]` - `ALTER TABLE table RECOVER PARTITIONS` to `RepairTableCommand`. ### Why are the changes needed? 1. After the PR https://github.com/apache/spark/pull/31499, `ALTER TABLE table RECOVER PARTITIONS` is equal to `MSCK REPAIR TABLE table ADD PARTITIONS`. And mapping of the generic command `MSCK REPAIR TABLE` to the more specific execution node `AlterTableRecoverPartitionsCommand` can confuse devs in the future. 2. `ALTER TABLE table RECOVER PARTITIONS` does not support any options/extensions. So, additional parameters `enableAddPartitions` and `enableDropPartitions` in `AlterTableRecoverPartitionsCommand` confuse as well. ### Does this PR introduce _any_ user-facing change? No because this is internal API. ### How was this patch tested? By running the existing test suites: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite" $ build/sbt "test:testOnly *AlterTableRecoverPartitionsParserSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableSuite" $ build/sbt "test:testOnly *MsckRepairTableParserSuite" ``` Closes #31635 from MaxGekk/rename-recover-partitions. Authored-by: Max Gekk Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/ResolveSessionCatalog.scala | 8 ++------ .../sql/execution/command/createDataSourceTables.scala | 2 +- .../org/apache/spark/sql/execution/command/ddl.scala | 10 +++++----- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 68c608310e214..dde31f62e06b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -377,11 +377,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns) case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) => - AlterTableRecoverPartitionsCommand( - ident.asTableIdentifier, - addPartitions, - dropPartitions, - "MSCK REPAIR TABLE") + RepairTableCommand(ident.asTableIdentifier, addPartitions, dropPartitions) case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) => LoadDataCommand( @@ -422,7 +418,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) ShowColumnsCommand(db, v1TableName, output) case RecoverPartitions(ResolvedV1TableIdentifier(ident)) => - AlterTableRecoverPartitionsCommand( + RepairTableCommand( ident.asTableIdentifier, enableAddPartitions = true, enableDropPartitions = false, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index b3e48e37c66e2..995d6273ea588 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -189,7 +189,7 @@ case class CreateDataSourceTableAsSelectCommand( case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && sparkSession.sqlContext.conf.manageFilesourcePartitions => // Need to recover partitions into the metastore so our saved data is visible. - sessionState.executePlan(AlterTableRecoverPartitionsCommand( + sessionState.executePlan(RepairTableCommand( table.identifier, enableAddPartitions = true, enableDropPartitions = false)).toRdd diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f0219efbf9a98..2fc6d6fd85322 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -591,7 +591,7 @@ case class AlterTableDropPartitionCommand( case class PartitionStatistics(numFiles: Int, totalSize: Long) /** - * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and + * Repair a table by recovering all the partition in the directory of the table and * update the catalog. * * The syntax of this command is: @@ -600,11 +600,11 @@ case class PartitionStatistics(numFiles: Int, totalSize: Long) * MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS]; * }}} */ -case class AlterTableRecoverPartitionsCommand( +case class RepairTableCommand( tableName: TableIdentifier, enableAddPartitions: Boolean, enableDropPartitions: Boolean, - cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand { + cmd: String = "MSCK REPAIR TABLE") extends RunnableCommand { // These are list of statistics that can be collected quickly without requiring a scan of the data // see https://github.com/apache/hive/blob/master/ @@ -654,7 +654,7 @@ case class AlterTableRecoverPartitionsCommand( val threshold = spark.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD) val pathFilter = getPathFilter(hadoopConf) - val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8) + val evalPool = ThreadUtils.newForkJoinPool("RepairTableCommand", 8) val partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)] = try { scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold, @@ -804,7 +804,7 @@ case class AlterTableRecoverPartitionsCommand( private def dropPartitions(catalog: SessionCatalog, fs: FileSystem): Int = { val dropPartSpecs = ThreadUtils.parmap( catalog.listPartitions(tableName), - "AlterTableRecoverPartitionsCommand: non-existing partitions", + "RepairTableCommand: non-existing partitions", maxThreads = 8) { partition => partition.storage.locationUri.flatMap { uri => if (fs.exists(new Path(uri))) None else Some(partition.spec)