From 2b5fffa424556931f8eb29a7742d48cee23d2b38 Mon Sep 17 00:00:00 2001 From: yangbinbin <yangbinbin@aspirecn.com> Date: Fri, 18 Nov 2022 18:06:56 +0800 Subject: [PATCH] update --- docs/en/connector-v2/sink/Jdbc.md | 4 +- docs/en/connector-v2/source/Jdbc.md | 2 + .../connector-jdbc/pom.xml | 12 +++ .../dialect/teradata/TeradataDialect.java | 47 +++++++++ .../teradata/TeradataDialectFactory.java | 37 +++++++ .../teradata/TeradataJdbcRowConverter.java | 29 ++++++ .../dialect/teradata/TeradataTypeMapper.java | 98 +++++++++++++++++++ .../connector-jdbc-e2e/pom.xml | 5 + .../seatunnel/jdbc/JdbcTeradataIT.java | 86 ++++++++++++++++ .../jdbc_teradata_source_and_sink.conf | 77 +++++++++++++++ 10 files changed, 396 insertions(+), 1 deletion(-) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataJdbcRowConverter.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataTypeMapper.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTeradataIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_teradata_source_and_sink.conf diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index f145cc220f2..36efbe267d7 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -131,6 +131,7 @@ there are some reference value for params above. | GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | | StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | ## Example @@ -199,4 +200,5 @@ sink { ### next version -- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3378](https://github.com/apache/incubator-seatunnel/issues/3378)) \ No newline at end of file +- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3378](https://github.com/apache/incubator-seatunnel/issues/3378)) +- [Feature] Support Teradata JDBC Sink ([3362](https://github.com/apache/incubator-seatunnel/pull/3362)) \ No newline at end of file diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 1f1d5de8676..c04111aa0d6 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -102,6 +102,7 @@ there are some reference value for params above. | starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | | tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | +| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | ## Example @@ -151,3 +152,4 @@ parallel: - [BugFix] Fix jdbc split bug ([3220](https://github.com/apache/incubator-seatunnel/pull/3220)) - [Feature] Support Tablestore Source ([3309](https://github.com/apache/incubator-seatunnel/pull/3309)) +- [Feature] Support Teradata JDBC Source ([3362](https://github.com/apache/incubator-seatunnel/pull/3362)) \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index cd4a5f221e4..350239ce9ee 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -38,6 +38,7 @@ <oracle.version>12.2.0.1</oracle.version> <db2.version>db2jcc4</db2.version> <tablestore.version>5.13.9</tablestore.version> + <teradata.version>17.20.00.12</teradata.version> </properties> <dependencyManagement> @@ -92,6 +93,13 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>com.teradata.jdbc</groupId> + <artifactId>terajdbc4</artifactId> + <version>${teradata.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> </dependencyManagement> @@ -137,6 +145,10 @@ <groupId>com.aliyun.openservices</groupId> <artifactId>tablestore-jdbc</artifactId> </dependency> + <dependency> + <groupId>com.teradata.jdbc</groupId> + <artifactId>terajdbc4</artifactId> + </dependency> </dependencies> </project> diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java new file mode 100644 index 00000000000..d824d2ef01f --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.util.Optional; + +public class TeradataDialect implements JdbcDialect { + + @Override + public String dialectName() { + return "Teradata"; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new TeradataJdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new TeradataTypeMapper(); + } + + @Override + public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { + return Optional.empty(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java new file mode 100644 index 00000000000..70a4492868f --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; + +import com.google.auto.service.AutoService; + +@AutoService(JdbcDialectFactory.class) +public class TeradataDialectFactory implements JdbcDialectFactory { + + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:teradata:"); + } + + @Override + public JdbcDialect create() { + return new TeradataDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataJdbcRowConverter.java new file mode 100644 index 00000000000..8e5543eed06 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataJdbcRowConverter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; + +public class TeradataJdbcRowConverter extends AbstractJdbcRowConverter { + + @Override + public String converterName() { + return "Teradata"; + } + +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataTypeMapper.java new file mode 100644 index 00000000000..97cd9940d79 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataTypeMapper.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class TeradataTypeMapper implements JdbcDialectTypeMapper { + + // ============================data types===================== + + // -------------------------number---------------------------- + private static final String TERADATA_BYTEINT = "BYTEINT"; + private static final String TERADATA_SMALLINT = "SMALLINT"; + private static final String TERADATA_INTEGER = "INTEGER"; + private static final String TERADATA_BIGINT = "BIGINT"; + private static final String TERADATA_FLOAT = "FLOAT"; + private static final String TERADATA_DECIMAL = "DECIMAL"; + + // -------------------------string---------------------------- + private static final String TERADATA_CHAR = "CHAR"; + private static final String TERADATA_VARCHAR = "VARCHAR"; + private static final String TERADATA_CLOB = "CLOB"; + + + // ---------------------------binary--------------------------- + private static final String TERADATA_BYTE = "BYTE"; + private static final String TERADATA_VARBYTE = "VARBYTE"; + + // ------------------------------time------------------------- + private static final String TERADATA_DATE = "DATE"; + private static final String TERADATA_TIME = "TIME"; + private static final String TERADATA_TIMESTAMP = "TIMESTAMP"; + + // ------------------------------blob------------------------- + private static final String TERADATA_BLOB = "BLOB"; + + @Override + public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { + String teradataType = metadata.getColumnTypeName(colIndex).toUpperCase(); + switch (teradataType) { + case TERADATA_BYTEINT: + return BasicType.BYTE_TYPE; + case TERADATA_SMALLINT: + return BasicType.SHORT_TYPE; + case TERADATA_INTEGER: + return BasicType.INT_TYPE; + case TERADATA_BIGINT: + return BasicType.LONG_TYPE; + case TERADATA_FLOAT: + return BasicType.FLOAT_TYPE; + case TERADATA_DECIMAL: + return new DecimalType(metadata.getPrecision(colIndex), metadata.getScale(colIndex)); + case TERADATA_CHAR: + case TERADATA_VARCHAR: + case TERADATA_CLOB: + return BasicType.STRING_TYPE; + case TERADATA_BYTE: + case TERADATA_VARBYTE: + case TERADATA_BLOB: + return PrimitiveByteArrayType.INSTANCE; + case TERADATA_DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case TERADATA_TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case TERADATA_TIMESTAMP: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + default: + final String jdbcColumnName = metadata.getColumnName(colIndex); + throw new UnsupportedOperationException( + String.format( + "Doesn't support TERADATA type '%s' on column '%s' yet.", + teradataType, jdbcColumnName)); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml index 9504bb182f5..db554daf3c1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml @@ -103,6 +103,11 @@ <artifactId>postgresql</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.teradata.jdbc</groupId> + <artifactId>terajdbc4</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTeradataIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTeradataIT.java new file mode 100644 index 00000000000..df07b8b193d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTeradataIT.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import com.teradata.jdbc.TeraDataSource; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import java.sql.Connection; +import java.sql.Statement; + +@Disabled("Disabled because it needs user's personal teradata account to run this test!") +public class JdbcTeradataIT extends TestSuiteBase implements TestResource { + private static final String HOST = "1.2.3.4"; + private static final String PORT = "1025"; + private static final String USERNAME = "dbc"; + private static final String PASSWORD = "dbc"; + private static final String DATABASE = "test"; + private static final String SINK_TABLE = "sink_table"; + private static final String TERADATA_DRIVER_JAR = "https://repo1.maven.org/maven2/com/teradata/jdbc/terajdbc4/17.20.00.12/terajdbc4-17.20.00.12.jar"; + private final ContainerExtendedFactory extendedFactory = container -> { + container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + TERADATA_DRIVER_JAR); + }; + + private Connection connection; + + @TestTemplate + public void testTeradata(TestContainer container) throws Exception { + container.executeExtraCommands(extendedFactory); + Container.ExecResult execResult = container.executeJob("/jdbc_teradata_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + clearSinkTable(); + } + + private void clearSinkTable() { + try (Statement statement = connection.createStatement()) { + statement.execute(String.format("delete from %s", SINK_TABLE)); + } catch (Exception e) { + throw new RuntimeException("Test teradata server failed!", e); + } + } + + @BeforeAll + @Override + public void startUp() throws Exception { + TeraDataSource teraDataSource = new TeraDataSource(); + teraDataSource.setDSName(HOST); + teraDataSource.setDbsPort(PORT); + teraDataSource.setUser(USERNAME); + teraDataSource.setPassword(PASSWORD); + teraDataSource.setDATABASE(DATABASE); + this.connection = teraDataSource.getConnection(); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (connection != null) { + this.connection.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_teradata_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_teradata_source_and_sink.conf new file mode 100644 index 00000000000..7408228285b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_teradata_source_and_sink.conf @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.teradata.jdbc.TeraDriver + url = "jdbc:teradata://1.2.3.4/DBS_PORT=1025,DATABASE=test,TYPE=FASTEXPORT" + user = "dbc" + password = "dbc" + query = """ + select id, + c_byteint, + c_smallint, + c_integer, + c_bigint, + c_float, + c_decimal, + c_char, + c_varchar, + c_byte, + c_varbyte, + c_date, + c_timestamp + from source_table; + """ + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +sink { + Jdbc { + driver = com.teradata.jdbc.TeraDriver + url = "jdbc:teradata://1.2.3.4/DBS_PORT=1025,DATABASE=test,TYPE=FASTLOAD" + user = "dbc" + password = "dbc" + auto_commit = false + query = """ + insert into sink_table(id, + c_byteint, + c_smallint, + c_integer, + c_bigint, + c_float, + c_decimal, + c_char, + c_varchar, + c_byte, + c_varbyte, + c_date, + c_timestamp) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +""" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +}