Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31891][SQL] Support MSCK REPAIR TABLE .. [{ADD|DROP|SYNC} PARTITIONS] #31499

Closed
wants to merge 10 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Feb 6, 2021

What changes were proposed in this pull request?

In the PR, I propose to extend the MSCK REPAIR TABLE command, and support new options {ADD|DROP|SYNC} PARTITIONS. In particular:

  1. Extend the logical node RepairTable, and add two new flags enableAddPartitions and enableDropPartitions.
  2. Add similar flags to the v1 execution node AlterTableRecoverPartitionsCommand
  3. Add new method dropPartitions() to AlterTableRecoverPartitionsCommand which drops partitions from the catalog if their locations in the file system don't exist.
  4. Updated public docs about the MSCK REPAIR TABLE command:

Screenshot 2021-02-16 at 13 46 39

Closes #31097

Why are the changes needed?

  • The changes allow to recover tables with removed partitions. The example below portraits the problem:
spark-sql> create table tbl2 (col int, part int) partitioned by (part);
spark-sql> insert into tbl2 partition (part=1) select 1;
spark-sql> insert into tbl2 partition (part=0) select 0;
spark-sql> show table extended like 'tbl2' partition (part = 0);
default	tbl2	false	Partition Values: [part=0]
Location: file:/Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0
...

Remove the partition (part = 0) from the filesystem:

$ rm -rf /Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0

Even after recovering, we cannot query the table:

spark-sql> msck repair table tbl2;
spark-sql> select * from tbl2;
21/01/08 22:49:13 ERROR SparkSQLDriver: Failed in [select * from tbl2]
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0

Does this PR introduce any user-facing change?

Yes. After the changes, we can query recovered table:

spark-sql> msck repair table tbl2 sync partitions;
spark-sql> select * from tbl2;
1	1
spark-sql> show partitions tbl2;
part=1

How was this patch tested?

  • By running the modified test suite:
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableParserSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *PlanResolutionSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsParallelSuite"
  • Added unified v1 and v2 tests for MSCK REPAIR TABLE:
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableSuite"

@github-actions github-actions bot added the SQL label Feb 6, 2021
@SparkQA
Copy link

SparkQA commented Feb 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39544/

@SparkQA
Copy link

SparkQA commented Feb 6, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39544/

@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Test build #134961 has finished for PR 31499 at commit c313bdb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class RepairTable(

@github-actions github-actions bot added the DOCS label Feb 7, 2021
@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39564/

@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39564/

@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Test build #134981 has finished for PR 31499 at commit 1168390.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk MaxGekk changed the title [WIP][SPARK-31891][SQL] Support MSCK REPAIR TABLE .. (ADD|DROP|SYNC) PARTITIONS [SPARK-31891][SQL] Support MSCK REPAIR TABLE .. [ADD|DROP|SYNC] PARTITIONS Feb 7, 2021
@MaxGekk
Copy link
Member Author

MaxGekk commented Feb 7, 2021

@cloud-fan @dongjoon-hyun @viirya @HyukjinKwon May I ask you to review this PR.

@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39574/

@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39574/

@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Test build #134991 has finished for PR 31499 at commit 79ec7a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Feb 10, 2021

@cloud-fan @HyukjinKwon Any objections to the changes?

@MaxGekk
Copy link
Member Author

MaxGekk commented Feb 10, 2021

@dongjoon-hyun @viirya Are you ok with the changes?

@dongjoon-hyun
Copy link
Member

Sorry for being late, @MaxGekk . I'll take a look at this during weekend.

@dongjoon-hyun dongjoon-hyun self-requested a review February 13, 2021 00:19
@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Feb 13, 2021

I added me as a reviewer in order not to forget this. It doesn't work as expected. I assigned it to myself.

@dongjoon-hyun dongjoon-hyun removed their request for review February 13, 2021 00:21
@dongjoon-hyun dongjoon-hyun self-assigned this Feb 13, 2021
@@ -37,6 +37,13 @@ MSCK REPAIR TABLE table_identifier

**Syntax:** `[ database_name. ] table_name`

* **`[[ADD|DROP|SYNC] PARTITIONS]`**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the syntax and function aiming to be identical with HIVE-17824?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the syntax is from to the SQL standard but functionality is from Hive.

@@ -37,6 +37,13 @@ MSCK REPAIR TABLE table_identifier

**Syntax:** `[ database_name. ] table_name`

* **`[[ADD|DROP|SYNC] PARTITIONS]`**

* If the option is not specified, `MSCK REPAIR TABLE` adds partitions to the Hive external catalog only.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the option sounds a little mismatched because this is a SQL syntax. Are you mentioned the optional syntaxes like this in our SQL docs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove the option, I found it in another command: http://spark.apache.org/docs/latest/sql-ref-syntax-aux-analyze-table.html

If no analyze option is specified, ANALYZE TABLE collects ...

but docs for other commands say If specified, ...

* **`[[ADD|DROP|SYNC] PARTITIONS]`**

* If the option is not specified, `MSCK REPAIR TABLE` adds partitions to the Hive external catalog only.
* **ADD**, the command adds new partitions in the catalog for all sub-folder in the base table folder that don't belong to any table partitions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like line 42 (before this line) and line 44 (after this line), the catalog -> the Hive external catalog consistently?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the command calls the session catalog which is an internal catalog servers as a proxy to an external catalog (In-Memory or Hive). Strictly speaking, the command adds/drops partitions in the session catalog. Let me update the doc to be more precise.

@@ -229,7 +229,8 @@ statement
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
multipartIdentifier partitionSpec? #loadData
| TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable
| MSCK REPAIR TABLE multipartIdentifier #repairTable
| MSCK REPAIR TABLE multipartIdentifier
(option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@MaxGekk
Copy link
Member Author

MaxGekk commented Feb 15, 2021

@dongjoon-hyun Thank you for reviewing this PR. Regarding to syntax for alternatives, I wasn't sure what should I use in docs. I just looked at existing examples:

  1. ANALYZE TABLE (similar in EXPLAIN)

In docs (link):

[ NOSCAN | FOR COLUMNS col [ , ... ] | FOR ALL COLUMNS ]

In SqlBase.g4:

(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze

  1. SHOW TABLES

In docs (link)

{ FROM | IN }

In SqlBase.g4:

| SHOW TABLES ((FROM | IN) multipartIdentifier)?

You proposed (ADD|DROP|SYNC) but I haven't found any examples with such syntax. Do you know what the syntax the Spark SQL guide uses?

@dongjoon-hyun
Copy link
Member

  1. For the first your example, [ NOSCAN | FOR COLUMNS col [ , ... ] | FOR ALL COLUMNS ] means we can omit all. So, we cannot write like that here.
  2. For the second example, you mean {} is correct for this case because ( and ) is used as literal. Did I understand correctly?

+1 for your suggestion (2).

@MaxGekk
Copy link
Member Author

MaxGekk commented Feb 15, 2021

@dongjoon-hyun I have looked at the SQL standard, it uses both notions:

{ UTF8 | UTF16 | UTF32}
GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY

and

[ INSTANCE | STATIC | CONSTRUCTOR ] METHOD

It seems {alternative1 | alternative2 } means one of the alternatives must be specified, [alternative1 | alternative2] - both are optional.

I do believe we should use the syntax MSCK REPAIR TABLE .. [ { ADD | DROP | SYNC } PARTITIONS ] in docs/comments.

@MaxGekk MaxGekk changed the title [SPARK-31891][SQL] Support MSCK REPAIR TABLE .. [ADD|DROP|SYNC] PARTITIONS [SPARK-31891][SQL] Support MSCK REPAIR TABLE .. [{ ADD | DROP |SYNC } PARTITIONS] Feb 16, 2021
@MaxGekk MaxGekk changed the title [SPARK-31891][SQL] Support MSCK REPAIR TABLE .. [{ ADD | DROP |SYNC } PARTITIONS] [SPARK-31891][SQL] Support MSCK REPAIR TABLE .. [{ADD|DROP|SYNC} PARTITIONS] Feb 16, 2021
@SparkQA
Copy link

SparkQA commented Feb 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39756/

@SparkQA
Copy link

SparkQA commented Feb 16, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39756/

@SparkQA
Copy link

SparkQA commented Feb 16, 2021

Test build #135175 has finished for PR 31499 at commit 7b9ee52.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Feb 22, 2021

@dongjoon-hyun Could you take a look at this PR one more time, please.

@dongjoon-hyun
Copy link
Member

Oops. Sorry, @MaxGekk . Could you rebase to the master once more please?

dropPartSpecs,
ignoreIfNotExists = true,
purge = false,
retainData = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment about the reason why we use retainData=true? I guess the reason is that fs.exists(..) is already false and we don't want addition file system calls. Did I understand correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... we don't want addition file system calls. Did I understand correctly?

Yep, if we set retainData to true, the deleteData flag will false at https://github.com/apache/hive/blob/release-3.1.3-rc0/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java#L4360-L4378 . So, Hive MeteStore will not try to delete the partition folders.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same for the In-Memory catalog:

if (existingParts.contains(p) && shouldRemovePartitionLocation) {
val partitionPath = new Path(existingParts(p).location)
try {
val fs = partitionPath.getFileSystem(hadoopConfig)
fs.delete(partitionPath, true)
} catch {
case e: IOException =>
throw QueryExecutionErrors.unableToDeletePartitionPathError(partitionPath, e)
}
}

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM with one minor comment about adding comment.
Let's rebase and see the CI result.

…artitions

# Conflicts:
#	sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala
@SparkQA
Copy link

SparkQA commented Feb 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39954/

@SparkQA
Copy link

SparkQA commented Feb 23, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39954/

@SparkQA
Copy link

SparkQA commented Feb 23, 2021

Test build #135374 has finished for PR 31499 at commit fefea57.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Thank you for updating, @MaxGekk !

@dongjoon-hyun
Copy link
Member

Merged to master for Apache Spark 3.2.0.

@@ -39,6 +39,13 @@ MSCK REPAIR TABLE table_identifier

**Syntax:** `[ database_name. ] table_name`

* **`{ADD|DROP|SYNC} PARTITIONS`**

* If specified, `MSCK REPAIR TABLE` only adds partitions to the session catalog.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: should be If not specified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to put it in the end, and say If not specified, ADD is the default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR #31633

checkPartitions(t, Map("part" -> "1"), Map("part" -> "2"))
checkAnswer(sql(s"SELECT col, part FROM $t"), Seq(Row(1, 1), Row(0, 2)))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we test ADD?

Copy link
Member Author

@MaxGekk MaxGekk Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have many tests for ADD in AlterTableRecoverPartitionsSuite since ALTER TABLE .. RECOVER PARTITIONS is equal to MSCK REPAIR TABLE .. ADD PARTITIONS semantically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see

Copy link
Member Author

@MaxGekk MaxGekk Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan If you would like to test MSCK REPAIR TABLE .. ADD PARTITIONS explicitly, we could mix the v1.AlterTableRecoverPartitionsSuiteBase to MsckRepairTableSuiteBase to run the existing tests automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to

@cloud-fan
Copy link
Contributor

late LGTM

cloud-fan pushed a commit that referenced this pull request Feb 25, 2021
…pairTableCommand`

### What changes were proposed in this pull request?
Rename the execution node `AlterTableRecoverPartitionsCommand` for the commands:
- `MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS]`
- `ALTER TABLE table RECOVER PARTITIONS`

to `RepairTableCommand`.

### Why are the changes needed?
1. After the PR #31499, `ALTER TABLE table RECOVER PARTITIONS` is equal to `MSCK REPAIR TABLE table ADD PARTITIONS`. And mapping of the generic command `MSCK REPAIR TABLE` to the more specific execution node `AlterTableRecoverPartitionsCommand` can confuse devs in the future.
2. `ALTER TABLE table RECOVER PARTITIONS` does not support any options/extensions. So, additional parameters `enableAddPartitions` and `enableDropPartitions` in `AlterTableRecoverPartitionsCommand` confuse as well.

### Does this PR introduce _any_ user-facing change?
No because this is internal API.

### How was this patch tested?
By running the existing test suites:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite"
$ build/sbt "test:testOnly *AlterTableRecoverPartitionsParserSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableSuite"
$ build/sbt "test:testOnly *MsckRepairTableParserSuite"
```

Closes #31635 from MaxGekk/rename-recover-partitions.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@dongjoon-hyun dongjoon-hyun removed their assignment Apr 22, 2021
xuanyuanking pushed a commit to xuanyuanking/spark that referenced this pull request Sep 29, 2021
…TITIONS]`

### What changes were proposed in this pull request?

In the PR, I propose to extend the `MSCK REPAIR TABLE` command, and support new options `{ADD|DROP|SYNC} PARTITIONS`. In particular:

1. Extend the logical node `RepairTable`, and add two new flags `enableAddPartitions` and `enableDropPartitions`.
2. Add similar flags to the v1 execution node `AlterTableRecoverPartitionsCommand`
3. Add new method `dropPartitions()` to `AlterTableRecoverPartitionsCommand` which drops partitions from the catalog if their locations in the file system don't exist.
4. Updated public docs about the `MSCK REPAIR TABLE` command:
<img width="1037" alt="Screenshot 2021-02-16 at 13 46 39" src="https://user-images.githubusercontent.com/1580697/108052607-7446d280-705d-11eb-8e25-7398254787a4.png">

Closes apache#31097

### Why are the changes needed?
- The changes allow to recover tables with removed partitions. The example below portraits the problem:
```sql
spark-sql> create table tbl2 (col int, part int) partitioned by (part);
spark-sql> insert into tbl2 partition (part=1) select 1;
spark-sql> insert into tbl2 partition (part=0) select 0;
spark-sql> show table extended like 'tbl2' partition (part = 0);
default	tbl2	false	Partition Values: [part=0]
Location: file:/Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0
...
```
Remove the partition (part = 0) from the filesystem:
```
$ rm -rf /Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0
```
Even after recovering, we cannot query the table:
```sql
spark-sql> msck repair table tbl2;
spark-sql> select * from tbl2;
21/01/08 22:49:13 ERROR SparkSQLDriver: Failed in [select * from tbl2]
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0
```

- To have feature parity with Hive: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE)

### Does this PR introduce _any_ user-facing change?
Yes. After the changes, we can query recovered table:
```sql
spark-sql> msck repair table tbl2 sync partitions;
spark-sql> select * from tbl2;
1	1
spark-sql> show partitions tbl2;
part=1
```

### How was this patch tested?
- By running the modified test suite:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableParserSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *PlanResolutionSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsParallelSuite"
```
- Added unified v1 and v2 tests for `MSCK REPAIR TABLE`:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableSuite"
```

Closes apache#31499 from MaxGekk/repair-table-drop-partitions.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants