-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31891][SQL] Support MSCK REPAIR TABLE .. [{ADD|DROP|SYNC} PARTITIONS]
#31499
Conversation
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #134961 has finished for PR 31499 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #134981 has finished for PR 31499 at commit
|
MSCK REPAIR TABLE .. [ADD|DROP|SYNC] PARTITIONS
@cloud-fan @dongjoon-hyun @viirya @HyukjinKwon May I ask you to review this PR. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #134991 has finished for PR 31499 at commit
|
@cloud-fan @HyukjinKwon Any objections to the changes? |
@dongjoon-hyun @viirya Are you ok with the changes? |
Sorry for being late, @MaxGekk . I'll take a look at this during weekend. |
|
@@ -37,6 +37,13 @@ MSCK REPAIR TABLE table_identifier | |||
|
|||
**Syntax:** `[ database_name. ] table_name` | |||
|
|||
* **`[[ADD|DROP|SYNC] PARTITIONS]`** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the syntax and function aiming to be identical with HIVE-17824
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the syntax is from to the SQL standard but functionality is from Hive.
@@ -37,6 +37,13 @@ MSCK REPAIR TABLE table_identifier | |||
|
|||
**Syntax:** `[ database_name. ] table_name` | |||
|
|||
* **`[[ADD|DROP|SYNC] PARTITIONS]`** | |||
|
|||
* If the option is not specified, `MSCK REPAIR TABLE` adds partitions to the Hive external catalog only. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the option
sounds a little mismatched because this is a SQL syntax. Are you mentioned the optional syntaxes like this in our SQL docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove the option
, I found it in another command: http://spark.apache.org/docs/latest/sql-ref-syntax-aux-analyze-table.html
If no analyze option is specified, ANALYZE TABLE collects ...
but docs for other commands say If specified, ...
* **`[[ADD|DROP|SYNC] PARTITIONS]`** | ||
|
||
* If the option is not specified, `MSCK REPAIR TABLE` adds partitions to the Hive external catalog only. | ||
* **ADD**, the command adds new partitions in the catalog for all sub-folder in the base table folder that don't belong to any table partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like line 42 (before this line) and line 44 (after this line), the catalog
-> the Hive external catalog
consistently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the command calls the session catalog which is an internal catalog servers as a proxy to an external catalog (In-Memory or Hive). Strictly speaking, the command adds/drops partitions in the session catalog. Let me update the doc to be more precise.
@@ -229,7 +229,8 @@ statement | |||
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE | |||
multipartIdentifier partitionSpec? #loadData | |||
| TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable | |||
| MSCK REPAIR TABLE multipartIdentifier #repairTable | |||
| MSCK REPAIR TABLE multipartIdentifier | |||
(option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@dongjoon-hyun Thank you for reviewing this PR. Regarding to syntax for alternatives, I wasn't sure what should I use in docs. I just looked at existing examples:
In docs (link):
In SqlBase.g4: spark/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 Line 136 in ba974ea
In docs (link)
In SqlBase.g4: spark/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 Line 198 in ba974ea
You proposed |
+1 for your suggestion (2). |
@dongjoon-hyun I have looked at the SQL standard, it uses both notions: { UTF8 | UTF16 | UTF32}
GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY and [ INSTANCE | STATIC | CONSTRUCTOR ] METHOD It seems I do believe we should use the syntax |
MSCK REPAIR TABLE .. [ADD|DROP|SYNC] PARTITIONS
MSCK REPAIR TABLE .. [{ ADD | DROP |SYNC } PARTITIONS]
MSCK REPAIR TABLE .. [{ ADD | DROP |SYNC } PARTITIONS]
MSCK REPAIR TABLE .. [{ADD|DROP|SYNC} PARTITIONS]
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #135175 has finished for PR 31499 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
Show resolved
Hide resolved
@dongjoon-hyun Could you take a look at this PR one more time, please. |
Oops. Sorry, @MaxGekk . Could you rebase to the master once more please? |
dropPartSpecs, | ||
ignoreIfNotExists = true, | ||
purge = false, | ||
retainData = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment about the reason why we use retainData=true
? I guess the reason is that fs.exists(..)
is already false
and we don't want addition file system calls. Did I understand correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... we don't want addition file system calls. Did I understand correctly?
Yep, if we set retainData
to true
, the deleteData
flag will false
at https://github.com/apache/hive/blob/release-3.1.3-rc0/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java#L4360-L4378 . So, Hive MeteStore will not try to delete the partition folders.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same for the In-Memory
catalog:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
Lines 464 to 473 in bfc0235
if (existingParts.contains(p) && shouldRemovePartitionLocation) { | |
val partitionPath = new Path(existingParts(p).location) | |
try { | |
val fs = partitionPath.getFileSystem(hadoopConfig) | |
fs.delete(partitionPath, true) | |
} catch { | |
case e: IOException => | |
throw QueryExecutionErrors.unableToDeletePartitionPathError(partitionPath, e) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM with one minor comment about adding comment.
Let's rebase and see the CI result.
…artitions # Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #135374 has finished for PR 31499 at commit
|
Thank you for updating, @MaxGekk ! |
Merged to master for Apache Spark 3.2.0. |
@@ -39,6 +39,13 @@ MSCK REPAIR TABLE table_identifier | |||
|
|||
**Syntax:** `[ database_name. ] table_name` | |||
|
|||
* **`{ADD|DROP|SYNC} PARTITIONS`** | |||
|
|||
* If specified, `MSCK REPAIR TABLE` only adds partitions to the session catalog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: should be If not specified
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to put it in the end, and say If not specified, ADD is the default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the PR #31633
checkPartitions(t, Map("part" -> "1"), Map("part" -> "2")) | ||
checkAnswer(sql(s"SELECT col, part FROM $t"), Seq(Row(1, 1), Row(0, 2))) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we test ADD?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have many tests for ADD in AlterTableRecoverPartitionsSuite
since ALTER TABLE .. RECOVER PARTITIONS
is equal to MSCK REPAIR TABLE .. ADD PARTITIONS
semantically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan If you would like to test MSCK REPAIR TABLE .. ADD PARTITIONS
explicitly, we could mix the v1.AlterTableRecoverPartitionsSuiteBase
to MsckRepairTableSuiteBase
to run the existing tests automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to
late LGTM |
…pairTableCommand` ### What changes were proposed in this pull request? Rename the execution node `AlterTableRecoverPartitionsCommand` for the commands: - `MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS]` - `ALTER TABLE table RECOVER PARTITIONS` to `RepairTableCommand`. ### Why are the changes needed? 1. After the PR #31499, `ALTER TABLE table RECOVER PARTITIONS` is equal to `MSCK REPAIR TABLE table ADD PARTITIONS`. And mapping of the generic command `MSCK REPAIR TABLE` to the more specific execution node `AlterTableRecoverPartitionsCommand` can confuse devs in the future. 2. `ALTER TABLE table RECOVER PARTITIONS` does not support any options/extensions. So, additional parameters `enableAddPartitions` and `enableDropPartitions` in `AlterTableRecoverPartitionsCommand` confuse as well. ### Does this PR introduce _any_ user-facing change? No because this is internal API. ### How was this patch tested? By running the existing test suites: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite" $ build/sbt "test:testOnly *AlterTableRecoverPartitionsParserSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableSuite" $ build/sbt "test:testOnly *MsckRepairTableParserSuite" ``` Closes #31635 from MaxGekk/rename-recover-partitions. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…TITIONS]` ### What changes were proposed in this pull request? In the PR, I propose to extend the `MSCK REPAIR TABLE` command, and support new options `{ADD|DROP|SYNC} PARTITIONS`. In particular: 1. Extend the logical node `RepairTable`, and add two new flags `enableAddPartitions` and `enableDropPartitions`. 2. Add similar flags to the v1 execution node `AlterTableRecoverPartitionsCommand` 3. Add new method `dropPartitions()` to `AlterTableRecoverPartitionsCommand` which drops partitions from the catalog if their locations in the file system don't exist. 4. Updated public docs about the `MSCK REPAIR TABLE` command: <img width="1037" alt="Screenshot 2021-02-16 at 13 46 39" src="https://user-images.githubusercontent.com/1580697/108052607-7446d280-705d-11eb-8e25-7398254787a4.png"> Closes apache#31097 ### Why are the changes needed? - The changes allow to recover tables with removed partitions. The example below portraits the problem: ```sql spark-sql> create table tbl2 (col int, part int) partitioned by (part); spark-sql> insert into tbl2 partition (part=1) select 1; spark-sql> insert into tbl2 partition (part=0) select 0; spark-sql> show table extended like 'tbl2' partition (part = 0); default tbl2 false Partition Values: [part=0] Location: file:/Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0 ... ``` Remove the partition (part = 0) from the filesystem: ``` $ rm -rf /Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0 ``` Even after recovering, we cannot query the table: ```sql spark-sql> msck repair table tbl2; spark-sql> select * from tbl2; 21/01/08 22:49:13 ERROR SparkSQLDriver: Failed in [select * from tbl2] org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0 ``` - To have feature parity with Hive: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE) ### Does this PR introduce _any_ user-facing change? Yes. After the changes, we can query recovered table: ```sql spark-sql> msck repair table tbl2 sync partitions; spark-sql> select * from tbl2; 1 1 spark-sql> show partitions tbl2; part=1 ``` ### How was this patch tested? - By running the modified test suite: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableParserSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *PlanResolutionSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsParallelSuite" ``` - Added unified v1 and v2 tests for `MSCK REPAIR TABLE`: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableSuite" ``` Closes apache#31499 from MaxGekk/repair-table-drop-partitions. Authored-by: Max Gekk <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
In the PR, I propose to extend the
MSCK REPAIR TABLE
command, and support new options{ADD|DROP|SYNC} PARTITIONS
. In particular:RepairTable
, and add two new flagsenableAddPartitions
andenableDropPartitions
.AlterTableRecoverPartitionsCommand
dropPartitions()
toAlterTableRecoverPartitionsCommand
which drops partitions from the catalog if their locations in the file system don't exist.MSCK REPAIR TABLE
command:Closes #31097
Why are the changes needed?
Remove the partition (part = 0) from the filesystem:
Even after recovering, we cannot query the table:
Does this PR introduce any user-facing change?
Yes. After the changes, we can query recovered table:
How was this patch tested?
MSCK REPAIR TABLE
: