-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[clone]refactor the clone action as we introduced external path #4844
base: master
Are you sure you want to change the base?
Conversation
I feel that the current clone process needs to be refactored:
This hierarchical approach to copying is the correct solution. |
Thanks for your advice, I will try do this best. |
Hi, Jingsong, according to the original design[1] and the above discussion, I plan to refactore to the following Flink batch job.
Please help confirm if this refactoring is appropriate, Thanks. [1] https://cwiki.apache.org/confluence/display/PAIMON/PIP-18%3A+Introduce+clone+Action+and+Procedure |
Hi @neuyilan , thanks for your design! The second stage, I think we can just pick manifests. We don't need to pick files here. |
Hi, @JingsongLi , The original design was to pick out all files and then copy the corresponding files according to the file type at each step. |
Yes, I think we can refactor it now. |
Hi, @JingsongLi , thanks again for advice, and I have refactored to the following Flink batch job, please review it again. Thanks.
|
@neuyilan |
Hi, @wwj6591812, Thanks for remind, I had a misunderstanding before. After this modification, both batch job and stream job will be affected. Is that right? |
@JingsongLi @wwj6591812 PTAL, Thanks. |
@@ -484,6 +484,28 @@ public DataFileMeta copy(List<String> newExtraFiles) { | |||
externalPath); | |||
} | |||
|
|||
public DataFileMeta copy(String newExternalPath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy => newExternalPath(String newExternalPath)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -53,10 +62,25 @@ public String getTargetIdentifier() { | |||
return targetIdentifier; | |||
} | |||
|
|||
@Nullable | |||
public FileType getFileType() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove useless field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final String sourceIdentifier; | ||
private final String targetIdentifier; | ||
private final long snapshotId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where will this variable be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable is used to transfer selections between upstream and downstream. For example, when creating the schema file, the latest snapshotid of the snapshot is determined. When selecting the data file later, the snapshot id is used for selection. Because in this process, the latest snapshot may change. If the snapshot id is not passed, the cloned schema file and data file may not be from the same snapshot.
@@ -50,18 +50,19 @@ | |||
* Pick the files to be cloned of a table based on the input record. The record type it produce is | |||
* CloneFileInfo that indicate the information of copy file. | |||
*/ | |||
public class PickFilesForCloneOperator extends AbstractStreamOperator<CloneFileInfo> | |||
public class PickSchemaFilesForCloneOperator extends AbstractStreamOperator<CloneFileInfo> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CopyMetaFilesOperator
(One parallelism)?
In this operator, just copy meta files directly:
- create table if needed.
- copy all schema files.
- copy snapshot file.
- copy manifest list files.
- copy index manifest file.
- copy statistics file.
And then, one link, send index files to one CopyIndexFilesOperator
(Multiple parallelism).
And another link, send manifest files to CopyManifestFilesOperator
(Multiple parallelism).
private final String sourceIdentifier; | ||
private final String targetIdentifier; | ||
private final long snapshotId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this field, see my comments for operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can not remove this filed. because if do not provided the snapshotId. when we do job in CopyManifestFilesOperator, we can not just pick the data files in manifest file, because the data files maybe delete in another mainifest file.
for example, in snapshot1, we add one data file data-file1.parquet
in manifest-file1
; in snapshot 2, we add one data file data-file2.parquet
and delete data-file1.parquet
in manifest-file2
. And these two manifest files were processed in two separate tasks, when processing manifest-file1
and copy data-file1.parquet
, the job will fail.
So we can not just pick the data files in manifest file. I think we still need the snapshot id. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to worry about deleting files in the manifest anymore. We just need to see which data files we need. Although we may copy extra files, it won't cause any problems
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that, when snapshot1 is expired, then the data-file1.parquet
will be deleted. when we read the manifest-file1
and copy data-file1.parquet
, it will always fail, because the file do not exist.
FileStore<?> store = sourceTable.store(); | ||
ManifestFile manifestFile = store.manifestFileFactory().create(); | ||
|
||
List<ManifestEntry> manifestEntries = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just emit data files to downstream here?
Purpose
https://cwiki.apache.org/confluence/display/PAIMON/PIP-29%3A+Introduce+Table+Multi-Location++Management
refactor the clone action as we introduced the external path.
I want to point out that regardless of where the data in the source table is stored (warehouse path or external path). We will all copy the data to the warehouse path of the target table.
If we still use the external path of the source table as the data path in target table. In that case, the data from the source table and the target table will be merged together.
what's your opinion?
Tests
Add CloneActionITCase.testCloneTableWithSourceTableExternalPath
API and Format
no
Documentation