diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 1ddbdd507d9..8df5f12bfab 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -226,7 +226,7 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires there are some reference value for params above. -| datasource | driver | url | xa_data_source_class_name | maven | +| datasource | driver | url | xa_data_source_class_name | maven | |-------------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------| | MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | PostgreSQL | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | @@ -235,7 +235,7 @@ there are some reference value for params above. | SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | | Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | | sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | / | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc | -| GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar | +| GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar | | StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | | saphana | com.sap.db.jdbc.Driver | jdbc:sap://localhost:39015 | / | https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc | @@ -248,6 +248,7 @@ there are some reference value for params above. | OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar | | xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | / | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar | | InterSystems IRIS | com.intersystems.jdbc.IRISDriver | jdbc:IRIS://localhost:1972/%SYS | / | https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar | +| opengauss | org.opengauss.Driver | jdbc:opengauss://localhost:5432/postgres | / | https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar | ## Example diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 27b3d875580..f2e7486e247 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -113,7 +113,7 @@ The JDBC Source connector supports parallel reading of data from tables. SeaTunn there are some reference value for params above. -| datasource | driver | url | maven | +| datasource | driver | url | maven | |-------------------|-----------------------------------------------------|------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------| | mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | @@ -122,7 +122,7 @@ there are some reference value for params above. | sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | | oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | | sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc | -| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar | +| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar | | starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | | tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | @@ -137,6 +137,7 @@ there are some reference value for params above. | Hive | org.apache.hive.jdbc.HiveDriver | jdbc:hive2://localhost:10000 | https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar | | xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar | | InterSystems IRIS | com.intersystems.jdbc.IRISDriver | jdbc:IRIS://localhost:1972/%SYS | https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar | +| opengauss | org.opengauss.Driver | jdbc:opengauss://localhost:5432/postgres | https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar | ## Example diff --git a/docs/zh/connector-v2/sink/Jdbc.md b/docs/zh/connector-v2/sink/Jdbc.md index e1ab422952e..6a7253da61c 100644 --- a/docs/zh/connector-v2/sink/Jdbc.md +++ b/docs/zh/connector-v2/sink/Jdbc.md @@ -216,26 +216,27 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md) 附录参数仅提供参考 -| 数据源 | driver | url | xa_data_source_class_name | maven | -|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------| -| MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| PostgreSQL | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | -| DM | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | -| Phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | -| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | -| Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | -| sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | / | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc | -| GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar | -| StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | -| saphana | com.sap.db.jdbc.Driver | jdbc:sap://localhost:39015 | / | https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc | -| Doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | -| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | -| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | -| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | -| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar | -| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar | +| 数据源 | driver | url | xa_data_source_class_name | maven | +|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|------------------------------------------------------------------------------------------------------| +| MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| PostgreSQL | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | +| DM | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | +| Phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | +| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | +| Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | +| sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | / | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc | +| GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar | +| StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| saphana | com.sap.db.jdbc.Driver | jdbc:sap://localhost:39015 | / | https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc | +| Doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | +| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | +| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | +| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | +| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar | +| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar | +| opengauss | org.opengauss.Driver | jdbc:opengauss://localhost:5432/postgres | / | https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar | ## 示例 diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index 7b4199c462f..a6be0dd03ba 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -53,6 +53,7 @@ 12.2.0 3.0.0 3.2.0 + 5.1.0-og @@ -203,11 +204,15 @@ ${iris.jdbc.version} provided - org.tikv tikv-client-java ${tikv.version} + + + org.opengauss + opengauss-jdbc + ${opengauss.jdbc.version} provided @@ -316,11 +321,14 @@ com.intersystems intersystems-jdbc - org.tikv tikv-client-java + + org.opengauss + opengauss-jdbc + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java new file mode 100644 index 00000000000..6a8eab50107 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.opengauss; + +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog; + +import com.google.common.annotations.VisibleForTesting; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; + +@Slf4j +public class OpenGaussCatalog extends PostgresCatalog { + + public OpenGaussCatalog( + String catalogName, + String username, + String pwd, + JdbcUrlUtil.UrlInfo urlInfo, + String defaultSchema) { + super(catalogName, username, pwd, urlInfo, defaultSchema); + } + + @VisibleForTesting + public void setConnection(String url, Connection connection) { + this.connectionMap.put(url, connection); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java new file mode 100644 index 00000000000..bff96ff6d30 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.opengauss; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.configuration.util.OptionValidationException; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; + +import com.google.auto.service.AutoService; + +import java.util.Optional; + +@AutoService(Factory.class) +public class OpenGaussCatalogFactory implements CatalogFactory { + + @Override + public String factoryIdentifier() { + return DatabaseIdentifier.OPENGAUSS; + } + + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL); + JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase); + Optional defaultDatabase = urlInfo.getDefaultDatabase(); + if (!defaultDatabase.isPresent()) { + throw new OptionValidationException(JdbcCatalogOptions.BASE_URL); + } + return new OpenGaussCatalog( + catalogName, + options.get(JdbcCatalogOptions.USERNAME), + options.get(JdbcCatalogOptions.PASSWORD), + urlInfo, + options.get(JdbcCatalogOptions.SCHEMA)); + } + + @Override + public OptionRule optionRule() { + return JdbcCatalogOptions.BASE_RULE.build(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java index 45f849c28bd..e2a32b4f3f0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java @@ -42,4 +42,5 @@ public class DatabaseIdentifier { public static final String XUGU = "XUGU"; public static final String IRIS = "IRIS"; public static final String INCEPTOR = "Inceptor"; + public static final String OPENGAUSS = "OpenGauss"; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/opengauss/OpenGaussDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/opengauss/OpenGaussDialectFactory.java new file mode 100644 index 00000000000..b1ceed51e9b --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/opengauss/OpenGaussDialectFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.opengauss; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialectFactory; + +import com.google.auto.service.AutoService; + +@AutoService(JdbcDialectFactory.class) +public class OpenGaussDialectFactory extends PostgresDialectFactory { + + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:opengauss:"); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java new file mode 100644 index 00000000000..5d2b8b19dc7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.opengauss.OpenGaussCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +@Slf4j +public class JdbcOpenGaussIT extends AbstractJdbcIT { + protected static final String OPENGAUSS_IMAGE = "opengauss/opengauss:5.0.0"; + + private static final String OPEN_GAUSS_ALIASES = "e2e_OpenGauss"; + private static final String DRIVER_CLASS = "org.opengauss.Driver"; + private static final int OPEN_GAUSS_PORT = 5432; + private static final String OPEN_GAUSS_URL = "jdbc:opengauss://" + HOST + ":%s/%s"; + private static final String USERNAME = "gaussdb"; + private static final String PASSWORD = "openGauss@123"; + private static final String DATABASE = "postgres"; + private static final String SCHEMA = "public"; + private static final String SOURCE_TABLE = "gs_e2e_source_table"; + private static final String SINK_TABLE = "gs_e2e_sink_table"; + private static final String CATALOG_TABLE = "e2e_table_catalog"; + private static final Integer GEN_ROWS = 100; + private static final List CONFIG_FILE = + Lists.newArrayList("/jdbc_opengauss_source_and_sink.conf"); + + private static final String CREATE_SQL = + "CREATE TABLE IF NOT EXISTS %s (\n" + + " gid SERIAL PRIMARY KEY,\n" + + " uuid_col UUID,\n" + + " text_col TEXT,\n" + + " varchar_col VARCHAR(255),\n" + + " char_col CHAR(10),\n" + + " boolean_col bool,\n" + + " smallint_col int2,\n" + + " integer_col int4,\n" + + " bigint_col BIGINT,\n" + + " decimal_col DECIMAL(10, 2),\n" + + " numeric_col NUMERIC(8, 4),\n" + + " real_col float4,\n" + + " double_precision_col float8,\n" + + " smallserial_col SMALLSERIAL,\n" + + " bigserial_col BIGSERIAL,\n" + + " date_col DATE,\n" + + " timestamp_col TIMESTAMP,\n" + + " bpchar_col BPCHAR(10),\n" + + " age INT NOT null\n" + + ");"; + + private static final String[] fieldNames = + new String[] { + "gid", + "uuid_col", + "text_col", + "varchar_col", + "char_col", + "boolean_col", + "smallint_col", + "integer_col", + "bigint_col", + "decimal_col", + "numeric_col", + "real_col", + "double_precision_col", + "smallserial_col", + "bigserial_col", + "date_col", + "timestamp_col", + "bpchar_col", + "age" + }; + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + driverUrl()); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + @Test + @Override + public void testCatalog() { + if (catalog == null) { + return; + } + TablePath sourceTablePath = + new TablePath( + jdbcCase.getDatabase(), jdbcCase.getSchema(), jdbcCase.getSourceTable()); + TablePath targetTablePath = + new TablePath( + jdbcCase.getCatalogDatabase(), + jdbcCase.getCatalogSchema(), + jdbcCase.getCatalogTable()); + + CatalogTable catalogTable = catalog.getTable(sourceTablePath); + catalog.createTable(targetTablePath, catalogTable, false); + Assertions.assertTrue(catalog.tableExists(targetTablePath)); + + catalog.dropTable(targetTablePath, false); + Assertions.assertFalse(catalog.tableExists(targetTablePath)); + } + + @Test + public void testCreateIndex() { + String schema = "public"; + String databaseName = jdbcCase.getDatabase(); + TablePath sourceTablePath = TablePath.of(databaseName, "public", "gs_e2e_source_table"); + TablePath targetTablePath = TablePath.of(databaseName, "public", "gs_ide_sink_table_2"); + OpenGaussCatalog openGaussCatalog = (OpenGaussCatalog) catalog; + CatalogTable catalogTable = openGaussCatalog.getTable(sourceTablePath); + dropTableWithAssert(openGaussCatalog, targetTablePath, true); + // not create index + createIndexOrNot(openGaussCatalog, targetTablePath, catalogTable, false); + Assertions.assertFalse(hasIndex(openGaussCatalog, targetTablePath)); + + dropTableWithAssert(openGaussCatalog, targetTablePath, true); + // create index + createIndexOrNot(openGaussCatalog, targetTablePath, catalogTable, true); + Assertions.assertTrue(hasIndex(openGaussCatalog, targetTablePath)); + + dropTableWithAssert(openGaussCatalog, targetTablePath, true); + } + + protected boolean hasIndex(Catalog catalog, TablePath targetTablePath) { + TableSchema tableSchema = catalog.getTable(targetTablePath).getTableSchema(); + PrimaryKey primaryKey = tableSchema.getPrimaryKey(); + List constraintKeys = tableSchema.getConstraintKeys(); + if (primaryKey != null && StringUtils.isNotBlank(primaryKey.getPrimaryKey())) { + return true; + } + if (!constraintKeys.isEmpty()) { + return true; + } + return false; + } + + private void dropTableWithAssert( + OpenGaussCatalog openGaussCatalog, + TablePath targetTablePath, + boolean ignoreIfNotExists) { + openGaussCatalog.dropTable(targetTablePath, ignoreIfNotExists); + Assertions.assertFalse(openGaussCatalog.tableExists(targetTablePath)); + } + + private void createIndexOrNot( + OpenGaussCatalog openGaussCatalog, + TablePath targetTablePath, + CatalogTable catalogTable, + boolean createIndex) { + openGaussCatalog.createTable(targetTablePath, catalogTable, false, createIndex); + Assertions.assertTrue(openGaussCatalog.tableExists(targetTablePath)); + } + + @Override + JdbcCase getJdbcCase() { + Map containerEnv = new HashMap<>(); + containerEnv.put("OPEN_GAUSS_PASSWORD", PASSWORD); + containerEnv.put("APP_USER", USERNAME); + containerEnv.put("APP_USER_PASSWORD", PASSWORD); + String jdbcUrl = String.format(OPEN_GAUSS_URL, OPEN_GAUSS_PORT, DATABASE); + Pair> testDataSet = initTestData(); + String[] fieldNames = testDataSet.getKey(); + + String insertSql = insertTable(SCHEMA, SOURCE_TABLE, fieldNames); + + return JdbcCase.builder() + .dockerImage(OPENGAUSS_IMAGE) + .networkAliases(OPEN_GAUSS_ALIASES) + .containerEnv(containerEnv) + .driverClass(DRIVER_CLASS) + .host(HOST) + .port(OPEN_GAUSS_PORT) + .localPort(OPEN_GAUSS_PORT) + .jdbcTemplate(OPEN_GAUSS_URL) + .jdbcUrl(jdbcUrl) + .userName(USERNAME) + .password(PASSWORD) + .database(DATABASE) + .schema(SCHEMA) + .sourceTable(SOURCE_TABLE) + .sinkTable(SINK_TABLE) + .catalogDatabase(DATABASE) + .catalogSchema(SCHEMA) + .catalogTable(CATALOG_TABLE) + .createSql(CREATE_SQL) + .configFile(CONFIG_FILE) + .insertSql(insertSql) + .testData(testDataSet) + .build(); + } + + @Override + String driverUrl() { + return "https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar"; + } + + @Override + protected Class loadDriverClass() { + return super.loadDriverClassFromUrl(); + } + + @Override + Pair> initTestData() { + List rows = new ArrayList<>(); + for (Integer i = 0; i < GEN_ROWS; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + i, + UUID.randomUUID(), + String.valueOf(i), + String.valueOf(i), + String.valueOf(i), + i % 2 == 0, + i, + i, + Long.valueOf(i), + BigDecimal.valueOf(i * 10.0), + BigDecimal.valueOf(i * 0.01), + Float.parseFloat("1.1"), + Double.parseDouble("1.111"), + i, + Long.valueOf(i), + LocalDate.of(2022, 1, 1).atStartOfDay(), + LocalDateTime.of(2022, 1, 1, 10, 0), + "Testing", + i + }); + rows.add(row); + } + + return Pair.of(fieldNames, rows); + } + + @Override + public String quoteIdentifier(String field) { + return "\"" + field + "\""; + } + + @Override + protected void clearTable(String database, String schema, String table) { + clearTable(schema, table); + } + + @Override + protected String buildTableInfoWithSchema(String database, String schema, String table) { + return buildTableInfoWithSchema(schema, table); + } + + @Override + GenericContainer initContainer() { + GenericContainer container = + new GenericContainer<>(OPENGAUSS_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(OPEN_GAUSS_ALIASES) + .withEnv("GS_PASSWORD", PASSWORD) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(OPENGAUSS_IMAGE))); + container.setPortBindings( + Lists.newArrayList(String.format("%s:%s", OPEN_GAUSS_PORT, OPEN_GAUSS_PORT))); + + return container; + } + + @Override + protected void initCatalog() { + String jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()); + catalog = + new OpenGaussCatalog( + DatabaseIdentifier.OPENGAUSS, + jdbcCase.getUserName(), + jdbcCase.getPassword(), + JdbcUrlUtil.getUrlInfo(jdbcUrl), + SCHEMA); + // set connection + ((OpenGaussCatalog) catalog).setConnection(jdbcUrl, connection); + catalog.open(); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_opengauss_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_opengauss_source_and_sink.conf new file mode 100644 index 00000000000..224fcb24037 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_opengauss_source_and_sink.conf @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + jdbc { + url = "jdbc:opengauss://e2e_OpenGauss:5432/postgres?loggerLevel=OFF" + driver = "org.opengauss.Driver" + connection_check_timeout_sec = 100 + user = "gaussdb" + password = "openGauss@123" + table_path = "postgres.public.gs_e2e_source_table" + query = "select * from public.gs_e2e_source_table" + split.size = 10 + } +} + +transform { +} + +sink { + jdbc { + url = "jdbc:opengauss://e2e_OpenGauss:5432/postgres?loggerLevel=OFF&stringtype=unspecified" + driver = "org.opengauss.Driver" + user = "gaussdb" + password = "openGauss@123" + database = "postgres" + table = "public.gs_e2e_sink_table" + compatible_mode = "postgresLow" + generate_sink_sql = true + } +}