diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index eff6bb67c67..35e9a986ab8 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -200,6 +201,25 @@ public TableSink createSink(TableSinkFactoryContext context) { .collect(Collectors.joining(","))); } } + } else { + // replace primary key to config + PrimaryKey configPk = + PrimaryKey.of( + catalogTable.getTablePath().getTableName() + "_config_pk", + config.get(PRIMARY_KEYS)); + TableSchema tableSchema = catalogTable.getTableSchema(); + catalogTable = + CatalogTable.of( + catalogTable.getTableId(), + TableSchema.builder() + .primaryKey(configPk) + .constraintKey(tableSchema.getConstraintKeys()) + .columns(tableSchema.getColumns()) + .build(), + catalogTable.getOptions(), + catalogTable.getPartitionKeys(), + catalogTable.getComment(), + catalogTable.getCatalogName()); } config = ReadonlyConfig.fromMap(new HashMap<>(map)); // always execute diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java index c8acc950105..bc1361aa267 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java @@ -47,6 +47,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,6 +75,7 @@ public class JdbcMysqlSaveModeHandlerIT extends AbstractJdbcIT { private static final String CREATE_SQL = "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + + " `id` bigint(20) NOT NULL,\n" + " `c_bit_1` bit(1) DEFAULT NULL,\n" + " `c_bit_8` bit(8) DEFAULT NULL,\n" + " `c_bit_16` bit(16) DEFAULT NULL,\n" @@ -164,6 +166,9 @@ void compareResult(String executeKey) { final List<Column> columns = table.getTableSchema().getColumns(); Assertions.assertEquals(columns.size(), columnsSource.size()); + Assertions.assertIterableEquals( + Collections.singletonList("id"), + table.getTableSchema().getPrimaryKey().getColumnNames()); } @Override @@ -175,6 +180,7 @@ String driverUrl() { Pair<String[], List<SeaTunnelRow>> initTestData() { String[] fieldNames = new String[] { + "id", "c_bit_1", "c_bit_8", "c_bit_16", @@ -229,6 +235,7 @@ Pair<String[], List<SeaTunnelRow>> initTestData() { SeaTunnelRow row = new SeaTunnelRow( new Object[] { + (long) i, i % 2 == 0 ? (byte) 1 : (byte) 0, new byte[] {byteArr}, new byte[] {byteArr, byteArr}, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_and_sink.conf index bc379f8ba8a..6305f55c46b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_and_sink.conf @@ -40,9 +40,12 @@ sink { driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "Abc!@#135_seatunnel" + generate_sink_sql = true - table = "test_laowang" database = "seatunnel" + table = "test_laowang" + primary_keys = ["id"] + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" }