-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Flink] Support multiple tables read and write #7713
Conversation
Please check ci error |
close #7008 |
Do not close/reopen. It would lost ci status. |
I'm sorry for that,I want to close my comment so i click "close with comment" button,but close my pull request instead,should I make a new pull request ? |
No, just push an emtpy commit can re-trigger it. |
Thank you very much,I have just pushed. |
8bc7722
to
6890a61
Compare
No errors now |
6890a61
to
e203655
Compare
e203655
to
f487289
Compare
@@ -42,4 +44,25 @@ public static DataStream<Row> tableToDataStream( | |||
public static boolean tableExists(TableEnvironment tableEnvironment, String name) { | |||
return Arrays.asList(tableEnvironment.listTables()).contains(name); | |||
} | |||
|
|||
// catalogName.databaseName.[schemeName].tableName -> databaseName.[schemeName].tableName | |||
public static String extractTableIdName(TableIdentifier tableIdentifier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use TableIdentifier#toTablePath().toString()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for suggestion, I've modified it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, I left some minior update.
Optional<SaveModeHandler> saveModeHandler = | ||
((SupportSaveMode) sink).getSaveModeHandler(); | ||
public void handleSaveMode(SeaTunnelSink seaTunnelSink) { | ||
if (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { | |
if (seaTunnelSink instanceof SupportSaveMode) { |
Optional<SaveModeHandler> saveModeHandler = | ||
((SupportSaveMode) sink).getSaveModeHandler(); | ||
public void handleSaveMode(SeaTunnelSink seaTunnelSink) { | ||
if (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
f487289
to
78af4fb
Compare
thank you for suggestion, I've modified it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
*/ | ||
@NoArgsConstructor | ||
@AutoService(TestContainer.class) | ||
public class Flink18Container extends AbstractTestFlinkContainer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update doc for flink 1.17 and 1.18
https://github.com/apache/seatunnel/blob/7b7f06437a62c1683f002db78458ec7dd90de108/docs/en/start-v2/locally/quick-start-flink.md#step-4-run-seatunnel-application
cc @TyrantLucifer any other doc/shell we should update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for suggestion, I've modified it, also https://github.com/apache/seatunnel/blob/7b7f06437a62c1683f002db78458ec7dd90de108/docs/zh/start-v2/locally/quick-start-flink.md#step-4-run-seatunnel-application.
type = {EngineType.SPARK, EngineType.FLINK}, | ||
disabledReason = "Currently SPARK and FLINK do not support multi table") | ||
type = {EngineType.SPARK}, | ||
disabledReason = "Currently SPARK do not support multi table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark already supports multiple tables, but not cdc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for suggestion, I've modified it.
type = {EngineType.SPARK, EngineType.FLINK}, | ||
disabledReason = "Currently SPARK and FLINK do not support multi table") | ||
type = {EngineType.SPARK}, | ||
disabledReason = "Currently SPARK do not support multi table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
type = {EngineType.SPARK, EngineType.FLINK}, | ||
disabledReason = "Currently SPARK and FLINK do not support multi table") | ||
type = {EngineType.SPARK}, | ||
disabledReason = "Currently SPARK do not support multi table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
type = {EngineType.SPARK, EngineType.FLINK}, | ||
disabledReason = "Currently SPARK and FLINK do not support multi table") | ||
type = {EngineType.SPARK}, | ||
disabledReason = "Currently SPARK do not support multi table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
type = {EngineType.SPARK, EngineType.FLINK}, | ||
disabledReason = "Currently SPARK and FLINK do not support multi table") | ||
type = {EngineType.SPARK}, | ||
disabledReason = "Currently SPARK do not support multi table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
Line 106 in 7b7f064
Please remove the comment here @PeppaPage |
78af4fb
to
afbc563
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @PeppaPage ! Welcome joining SeaTunnel!
@@ -225,7 +226,7 @@ public void testZetaStreamingCheckpointNoInterval(TestContainer container) | |||
|
|||
@TestTemplate | |||
@DisabledOnContainer( | |||
value = {}, | |||
value = {TestContainerId.FLINK_1_17, TestContainerId.FLINK_1_18}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @PeppaPage , why disabled this? cc @TyrantLucifer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, flink 1.17 and 1.18 change the default ExecutionGraph in batch mode , it makes the dynamic-graph true, so it will skip the checkpoint because dynamic graph doesn't support.
in 1.18:
in 1.15:
this problem can be fixed by setting jobmanager.scheduler: Ng
but i'm not sure whether it will conflict with other properties.
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
release-note
.