Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clone]refactor the clone action as we introduced external path #4844

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

neuyilan
Copy link
Member

@neuyilan neuyilan commented Jan 6, 2025

Purpose

https://cwiki.apache.org/confluence/display/PAIMON/PIP-29%3A+Introduce+Table+Multi-Location++Management

refactor the clone action as we introduced the external path.

I want to point out that regardless of where the data in the source table is stored (warehouse path or external path). We will all copy the data to the warehouse path of the target table.

If we still use the external path of the source table as the data path in target table. In that case, the data from the source table and the target table will be merged together.
what's your opinion?

Tests

Add CloneActionITCase.testCloneTableWithSourceTableExternalPath

API and Format

no

Documentation

@neuyilan neuyilan marked this pull request as draft January 6, 2025 13:06
@neuyilan neuyilan changed the title [clone]fix the clone when we introduced external path [clone]fix the clone action when we introduced external path Jan 6, 2025
@JingsongLi
Copy link
Contributor

I feel that the current clone process needs to be refactored:

  1. Single parallelism to query all manifest files and copy the manifest list file.
  2. Read the manifest in a distributed parallelism, determine whether to rewrite it (with or without an external path), and complete the copy or rewrite of the manifest.
  3. shuffle by data file name.
  4. Distributed copy data files.

This hierarchical approach to copying is the correct solution.

@neuyilan
Copy link
Member Author

neuyilan commented Jan 7, 2025

I feel that the current clone process needs to be refactored:

  1. Single parallelism to query all manifest files and copy the manifest list file.
  2. Read the manifest in a distributed parallelism, determine whether to rewrite it (with or without an external path), and complete the copy or rewrite of the manifest.
  3. shuffle by data file name.
  4. Distributed copy data files.

This hierarchical approach to copying is the correct solution.

Thanks for your advice, I will try do this best.

@neuyilan
Copy link
Member Author

neuyilan commented Jan 7, 2025

image

Hi, Jingsong, according to the original design[1] and the above discussion, I plan to refactore to the following Flink batch job.

  1. The first stage is responsible for pick the tables need cloned.If the database parameter is not passed, then all tables of all databases will be cloned.If the table parameter is not passed, then all tables of the database will be cloned. (not changed, the same as the original design).
  2. The second stage pick related files(Snapshot, Schema, ManifestList, Manifest, Datafile, ChangeLog, IndexFile) of the snapshot in source table.(not changed, the same as the original design).
  3. The thrid stage is only copy the schema files to the target path. the schema files contains: Snapshot, Schema, ManifestList and IndexFile.
  4. The fourth stage mainly involves copying or rewriting the manifest file in distributed parallelism. If it is an external path, rewrite it; otherwise, copy it.
  5. Shuffle the data file by the filename.(data file contains Datafile and ChangeLog).
  6. The fifth stage is copy the data files in distributed parallelism.
  7. Shuffle by the target's table name to next stage.
  8. The sixth stage is recreate the snapshot hint file. (not changed, the same as the original design).

Please help confirm if this refactoring is appropriate, Thanks.

[1] https://cwiki.apache.org/confluence/display/PAIMON/PIP-18%3A+Introduce+clone+Action+and+Procedure

@JingsongLi
Copy link
Contributor

Hi @neuyilan , thanks for your design!

The second stage, I think we can just pick manifests. We don't need to pick files here.

@neuyilan
Copy link
Member Author

neuyilan commented Jan 8, 2025

The second stage, I think we can just pick manifests. We don't need to pick files here.

Hi, @JingsongLi ,
if we only pick the manifests files in second stage, when we copy the Snapshot, Schema and IndexFile files, do you mean that we only pass one snapshot ID upstream and downstream, and then pick the required files at each step, and then copy the corresponding files?

The original design was to pick out all files and then copy the corresponding files according to the file type at each step.

@JingsongLi
Copy link
Contributor

The second stage, I think we can just pick manifests. We don't need to pick files here.

Hi, @JingsongLi , if we only pick the manifests files in second stage, when we copy the Snapshot, Schema and IndexFile files, do you mean that we only pass one snapshot ID upstream and downstream, and then pick the required files at each step, and then copy the corresponding files?

The original design was to pick out all files and then copy the corresponding files according to the file type at each step.

Yes, I think we can refactor it now.

@neuyilan
Copy link
Member Author

neuyilan commented Jan 9, 2025

image

Hi, @JingsongLi , thanks again for advice, and I have refactored to the following Flink batch job, please review it again. Thanks.

  1. The first stage is responsible for pick the tables need cloned.If the database parameter is not passed, then all tables of all databases will be cloned.If the table parameter is not passed, then all tables of the database will be cloned. (not changed, the same as the original design).
  2. The second stage just pick the schema files and copy it to the target path, the schema file contains Snapshot, Schema, ManifestList and IndexFile.
  3. The thrid stage just pick the mainifest file in single parallelism.
  4. The fourth stage mainly involves copying or rewriting the manifest file in distributed parallelism. If it is an external path, rewrite it; otherwise, copy it.
  5. The fifth stage is picking all the data files in single parallelism. (data file contains Datafile and ChangeLog).
  6. Shuffle the data file by the filename.
  7. The sixth stage is copy the data files in distributed parallelism.
  8. Shuffle by the target's table name to next stage.
  9. The seventh stage is recreate the snapshot hint file. (not changed, the same as the original design).

@wwj6591812
Copy link
Contributor

@neuyilan
Very thanks for prepare this PR.
I think change the job topology like this has no problem. And "pick the required files at each step, then copy the corresponding files" not only more clearer, but also increases the scalability.
Only one small question, why you emphasize this refactor only for batch job?Why don't modify the stream job's topology as same as the batch job?

@neuyilan
Copy link
Member Author

Only one small question, why you emphasize this refactor only for batch job?Why don't modify the stream job's topology as same as the batch job?

Hi, @wwj6591812, Thanks for remind, I had a misunderstanding before. After this modification, both batch job and stream job will be affected. Is that right?

@neuyilan neuyilan changed the title [clone]fix the clone action when we introduced external path [clone]refactor the clone action as we introduced external path Jan 10, 2025
@neuyilan neuyilan marked this pull request as ready for review January 10, 2025 03:48
@neuyilan
Copy link
Member Author

@JingsongLi @wwj6591812 PTAL, Thanks.

@@ -484,6 +484,28 @@ public DataFileMeta copy(List<String> newExtraFiles) {
externalPath);
}

public DataFileMeta copy(String newExternalPath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy => newExternalPath(String newExternalPath)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -53,10 +62,25 @@ public String getTargetIdentifier() {
return targetIdentifier;
}

@Nullable
public FileType getFileType() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove useless field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final String sourceIdentifier;
private final String targetIdentifier;
private final long snapshotId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where will this variable be used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is used to transfer selections between upstream and downstream. For example, when creating the schema file, the latest snapshotid of the snapshot is determined. When selecting the data file later, the snapshot id is used for selection. Because in this process, the latest snapshot may change. If the snapshot id is not passed, the cloned schema file and data file may not be from the same snapshot.

@neuyilan neuyilan requested a review from JingsongLi January 14, 2025 16:01
@neuyilan neuyilan closed this Jan 16, 2025
@neuyilan neuyilan reopened this Jan 16, 2025
@neuyilan neuyilan closed this Jan 17, 2025
@neuyilan neuyilan reopened this Jan 17, 2025
@@ -50,18 +50,19 @@
* Pick the files to be cloned of a table based on the input record. The record type it produce is
* CloneFileInfo that indicate the information of copy file.
*/
public class PickFilesForCloneOperator extends AbstractStreamOperator<CloneFileInfo>
public class PickSchemaFilesForCloneOperator extends AbstractStreamOperator<CloneFileInfo>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CopyMetaFilesOperator (One parallelism)?

In this operator, just copy meta files directly:

  1. create table if needed.
  2. copy all schema files.
  3. copy snapshot file.
  4. copy manifest list files.
  5. copy index manifest file.
  6. copy statistics file.

And then, one link, send index files to one CopyIndexFilesOperator (Multiple parallelism).
And another link, send manifest files to CopyManifestFilesOperator (Multiple parallelism).

private final String sourceIdentifier;
private final String targetIdentifier;
private final long snapshotId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this field, see my comments for operators.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can not remove this filed. because if do not provided the snapshotId. when we do job in CopyManifestFilesOperator, we can not just pick the data files in manifest file, because the data files maybe delete in another mainifest file.

for example, in snapshot1, we add one data file data-file1.parquet in manifest-file1 ; in snapshot 2, we add one data file data-file2.parquet and delete data-file1.parquet in manifest-file2. And these two manifest files were processed in two separate tasks, when processing manifest-file1 and copy data-file1.parquet, the job will fail.

So we can not just pick the data files in manifest file. I think we still need the snapshot id. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to worry about deleting files in the manifest anymore. We just need to see which data files we need. Although we may copy extra files, it won't cause any problems

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that, when snapshot1 is expired, then the data-file1.parquet will be deleted. when we read the manifest-file1 and copy data-file1.parquet, it will always fail, because the file do not exist.

FileStore<?> store = sourceTable.store();
ManifestFile manifestFile = store.manifestFileFactory().create();

List<ManifestEntry> manifestEntries =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just emit data files to downstream here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants