diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 68c608310e214..dde31f62e06b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -377,11 +377,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns) case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) => - AlterTableRecoverPartitionsCommand( - ident.asTableIdentifier, - addPartitions, - dropPartitions, - "MSCK REPAIR TABLE") + RepairTableCommand(ident.asTableIdentifier, addPartitions, dropPartitions) case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) => LoadDataCommand( @@ -422,7 +418,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) ShowColumnsCommand(db, v1TableName, output) case RecoverPartitions(ResolvedV1TableIdentifier(ident)) => - AlterTableRecoverPartitionsCommand( + RepairTableCommand( ident.asTableIdentifier, enableAddPartitions = true, enableDropPartitions = false, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index b3e48e37c66e2..995d6273ea588 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -189,7 +189,7 @@ case class CreateDataSourceTableAsSelectCommand( case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && sparkSession.sqlContext.conf.manageFilesourcePartitions => // Need to recover partitions into the metastore so our saved data is visible. - sessionState.executePlan(AlterTableRecoverPartitionsCommand( + sessionState.executePlan(RepairTableCommand( table.identifier, enableAddPartitions = true, enableDropPartitions = false)).toRdd diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f0219efbf9a98..2fc6d6fd85322 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -591,7 +591,7 @@ case class AlterTableDropPartitionCommand( case class PartitionStatistics(numFiles: Int, totalSize: Long) /** - * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and + * Repair a table by recovering all the partition in the directory of the table and * update the catalog. * * The syntax of this command is: @@ -600,11 +600,11 @@ case class PartitionStatistics(numFiles: Int, totalSize: Long) * MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS]; * }}} */ -case class AlterTableRecoverPartitionsCommand( +case class RepairTableCommand( tableName: TableIdentifier, enableAddPartitions: Boolean, enableDropPartitions: Boolean, - cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand { + cmd: String = "MSCK REPAIR TABLE") extends RunnableCommand { // These are list of statistics that can be collected quickly without requiring a scan of the data // see https://github.com/apache/hive/blob/master/ @@ -654,7 +654,7 @@ case class AlterTableRecoverPartitionsCommand( val threshold = spark.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD) val pathFilter = getPathFilter(hadoopConf) - val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8) + val evalPool = ThreadUtils.newForkJoinPool("RepairTableCommand", 8) val partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)] = try { scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold, @@ -804,7 +804,7 @@ case class AlterTableRecoverPartitionsCommand( private def dropPartitions(catalog: SessionCatalog, fs: FileSystem): Int = { val dropPartSpecs = ThreadUtils.parmap( catalog.listPartitions(tableName), - "AlterTableRecoverPartitionsCommand: non-existing partitions", + "RepairTableCommand: non-existing partitions", maxThreads = 8) { partition => partition.storage.locationUri.flatMap { uri => if (fs.exists(new Path(uri))) None else Some(partition.spec)