Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Flink] Support multiple tables read and write #7713

Merged
merged 1 commit into from
Sep 25, 2024

Conversation

PeppaPage
Copy link
Contributor

Purpose of this pull request

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@github-actions github-actions bot added CI&CD core SeaTunnel core module flink e2e labels Sep 22, 2024
@hailin0
Copy link
Member

hailin0 commented Sep 22, 2024

Please check ci error

@hailin0
Copy link
Member

hailin0 commented Sep 22, 2024

close #7008

@hailin0 hailin0 linked an issue Sep 22, 2024 that may be closed by this pull request
3 tasks
@PeppaPage
Copy link
Contributor Author

Please check ci error

Thank you for your review, I think this error has nothing to do with my commit, because this error test point(testStreamReadPaimon) is completely based on the seatunnel engine, and it did not pass this test point in the previous submission
微信图片_20240922234052

@PeppaPage
Copy link
Contributor Author

Please check ci error

Thank you for your review, I think this error has nothing to do with my commit, because this error test point(testStreamReadPaimon) is completely based on the seatunnel engine, and it did not pass this test point in the previous submission 微信图片_20240922234052

698fe59cc4951a14ac768394a8f662b

@PeppaPage PeppaPage closed this Sep 22, 2024
@PeppaPage PeppaPage reopened this Sep 22, 2024
@Hisoka-X
Copy link
Member

Hisoka-X commented Sep 23, 2024

Do not close/reopen. It would lost ci status.

@Hisoka-X
Copy link
Member

@PeppaPage
Copy link
Contributor Author

Do not close/reopen. It would lost ci status.

I'm sorry for that,I want to close my comment so i click "close with comment" button,but close my pull request instead,should I make a new pull request ?

@Hisoka-X
Copy link
Member

Do not close/reopen. It would lost ci status.

I'm sorry for that,I want to close my comment so i click "close with comment" button,but close my pull request instead,should I make a new pull request ?

No, just push an emtpy commit can re-trigger it.

@PeppaPage
Copy link
Contributor Author

Do not close/reopen. It would lost ci status.

I'm sorry for that,I want to close my comment so i click "close with comment" button,but close my pull request instead,should I make a new pull request ?

No, just push an emtpy commit can re-trigger it.

Thank you very much,I have just pushed.

@PeppaPage
Copy link
Contributor Author

Please check ci error

No errors now

@TyrantLucifer
Copy link
Member

@Hisoka-X @hailin0 PTAL

@hailin0
Copy link
Member

hailin0 commented Sep 24, 2024

Do you need to clean up the commit list that does not belong to you?

image

TyrantLucifer
TyrantLucifer previously approved these changes Sep 24, 2024
@@ -42,4 +44,25 @@ public static DataStream<Row> tableToDataStream(
public static boolean tableExists(TableEnvironment tableEnvironment, String name) {
return Arrays.asList(tableEnvironment.listTables()).contains(name);
}

// catalogName.databaseName.[schemeName].tableName -> databaseName.[schemeName].tableName
public static String extractTableIdName(TableIdentifier tableIdentifier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use TableIdentifier#toTablePath().toString() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for suggestion, I've modified it.

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, I left some minior update.

Optional<SaveModeHandler> saveModeHandler =
((SupportSaveMode) sink).getSaveModeHandler();
public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
if (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
if (seaTunnelSink instanceof SupportSaveMode) {

Optional<SaveModeHandler> saveModeHandler =
((SupportSaveMode) sink).getSaveModeHandler();
public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
if (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@Hisoka-X Hisoka-X added the First-time contributor First-time contributor label Sep 24, 2024
@PeppaPage
Copy link
Contributor Author

Overall LGTM, I left some minior update.

thank you for suggestion, I've modified it.

hailin0
hailin0 previously approved these changes Sep 25, 2024
Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

*/
@NoArgsConstructor
@AutoService(TestContainer.class)
public class Flink18Container extends AbstractTestFlinkContainer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK and FLINK do not support multi table")
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not support multi table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark already supports multiple tables, but not cdc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for suggestion, I've modified it.

type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK and FLINK do not support multi table")
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not support multi table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK and FLINK do not support multi table")
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not support multi table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK and FLINK do not support multi table")
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not support multi table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK and FLINK do not support multi table")
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not support multi table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

@Carl-Zhou-CN
Copy link
Member


Please remove the comment here @PeppaPage

Copy link
Member

@Carl-Zhou-CN Carl-Zhou-CN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @PeppaPage ! Welcome joining SeaTunnel!

@TyrantLucifer TyrantLucifer merged commit 65312ff into apache:dev Sep 25, 2024
10 checks passed
@@ -225,7 +226,7 @@ public void testZetaStreamingCheckpointNoInterval(TestContainer container)

@TestTemplate
@DisabledOnContainer(
value = {},
value = {TestContainerId.FLINK_1_17, TestContainerId.FLINK_1_18},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @PeppaPage , why disabled this? cc @TyrantLucifer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, flink 1.17 and 1.18 change the default ExecutionGraph in batch mode , it makes the dynamic-graph true, so it will skip the checkpoint because dynamic graph doesn't support.
e80fe4b5b4d2faa81d7570e0299db9f
a5f3d25c1c203e9b5b80ab50b1bfbf3
in 1.18:
d3057a7c564646ddbfa584c1ee79465
in 1.15:
6c23bafb06734c095c95776d89a64f3
this problem can be fixed by setting jobmanager.scheduler: Ng
0b3d0fbbc4859d14f28abfffb39b2be

but i'm not sure whether it will conflict with other properties.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Flink] Support multiple tables read and write
5 participants