-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature][connector-jdbc]Add Save Mode function and Connector-JDBC (MySQL) connector has been realized #5663
[feature][connector-jdbc]Add Save Mode function and Connector-JDBC (MySQL) connector has been realized #5663
Conversation
… has been realized
…_mysql # Conflicts: # seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java # seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
user = "root" | ||
password = "Abc!@#135_seatunnel" | ||
generate_sink_sql = true | ||
catalog { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this tag has deleted
#5645
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has been processed
@@ -221,6 +222,13 @@ public void handleSaveMode(DataSaveMode saveMode) { | |||
if (!catalog.tableExists(tablePath)) { | |||
catalog.createTable(tablePath, catalogTable, true); | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why retain these logics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has been processed
} catch (UnsupportedOperationException | CatalogException e) { | ||
// TODO Temporary fix, this feature has been changed in this pr | ||
// https://github.com/apache/seatunnel/pull/5645 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
if (!catalog.tableExists(tablePath)) { | ||
catalog.createTable(tablePath, catalogTable, true); | ||
} | ||
return new DefaultSaveModeHandler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
205 line:try (Catalog catalog = catalogOptional.get()) {
this code auto close catalog, Will savemode not interrupt abnormally?
rebase dev merge this pr |
…_mysql # Conflicts: # seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java # seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java # seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist"); | ||
; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist"); | |
; | |
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has been processed
// path and files in the path, create new files in the path. | ||
KEEP_SCHEMA_AND_DATA, | ||
// Preserve database structure, preserve data | ||
AND_DATA, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's meaning of AND_DATA
? I think you mean APPEND_DATA
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has been processed
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
Outdated
Show resolved
Hide resolved
String SCHEMA_SAVE_MODE_KEY = "schema_save_mode"; | ||
|
||
// This method defines the return of a specific save_mode handler | ||
SaveModeHandler getSaveModeHandler(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use optional to avoid NPE
SaveModeHandler getSaveModeHandler(); | |
Optional<SaveModeHandler> getSaveModeHandler(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has been processed
…_mysql # Conflicts: # seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java # seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java # seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java # seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
finalCatalogTable); | ||
} | ||
|
||
@Override | ||
public OptionRule optionRule() { | ||
return OptionRule.builder() | ||
.required(URL, DRIVER) | ||
.required(URL, DRIVER, SCHEMA_SAVE_MODE, DATA_SAVE_MODE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add testcase into this config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add new options into connector docs
...src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
Outdated
Show resolved
Hide resolved
CatalogTable catalogTable = context.getCatalogTable(); | ||
ReadonlyConfig catalogOptions = getCatalogOptions(context); | ||
Map<String, String> catalogOptions = | ||
config.get(CatalogOptions.CATALOG_OPTIONS) == null | ||
? new HashMap<>() | ||
: config.get(CatalogOptions.CATALOG_OPTIONS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compatible with catalog
…_mysql # Conflicts: # seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java # seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java # seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Function list