diff --git a/config/plugin_config b/config/plugin_config
index e3ac0f1d046a..7ed45eed8e7c 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -88,4 +88,5 @@ connector-web3j
connector-milvus
connector-activemq
connector-sls
---end--
\ No newline at end of file
+connector-cdc-opengauss
+--end--
diff --git a/docs/en/connector-v2/source/Opengauss-CDC.md b/docs/en/connector-v2/source/Opengauss-CDC.md
new file mode 100644
index 000000000000..00886050a697
--- /dev/null
+++ b/docs/en/connector-v2/source/Opengauss-CDC.md
@@ -0,0 +1,170 @@
+# Opengauss CDC
+
+> Opengauss CDC source connector
+
+## Support Those Engines
+
+> SeaTunnel Zeta
+> Flink
+
+## Key features
+
+- [ ] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Description
+
+The Opengauss CDC connector allows for reading snapshot data and incremental data from Opengauss database. This document
+describes how to set up the Opengauss CDC connector to run SQL queries against Opengauss databases.
+
+## Using steps
+
+> Here are the steps to enable CDC (Change Data Capture) in Opengauss:
+
+1. Ensure the wal_level is set to logical, you can use SQL commands to modify the configuration directly:
+
+```sql
+ALTER SYSTEM SET wal_level TO 'logical';
+SELECT pg_reload_conf();
+```
+
+2. Change the REPLICA policy of the specified table to FULL
+
+```sql
+ALTER TABLE your_table_name REPLICA IDENTITY FULL;
+```
+
+If you have multi tables,you can use the result of this sql to change the REPLICA policy of all tables to FULL
+
+```sql
+select 'ALTER TABLE ' || schemaname || '.' || tablename || ' REPLICA IDENTITY FULL;' from pg_tables where schemaname = 'YourTableSchema'
+```
+
+## Data Type Mapping
+
+| Opengauss Data type | SeaTunnel Data type |
+|-----------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL | BOOLEAN |
+| BYTEA | BYTES |
+| INT2 SMALLSERIAL INT4 SERIAL | INT |
+| INT8 BIGSERIAL | BIGINT |
+| FLOAT4 | FLOAT |
+| FLOAT8 | DOUBLE |
+| NUMERIC(Get the designated column's specified column size>0) | DECIMAL(Get the designated column's specified column size,Gets the number of digits in the specified column to the right of the decimal point) |
+| NUMERIC(Get the designated column's specified column size<0) | DECIMAL(38, 18) |
+| BPCHAR CHARACTER VARCHAR TEXT GEOMETRY GEOGRAPHY JSON JSONB | STRING |
+| TIMESTAMP | TIMESTAMP |
+| TIME | TIME |
+| DATE | DATE |
+| OTHER DATA TYPES | NOT SUPPORTED YET |
+
+## Source Options
+
+| Name | Type | Required | Default | Description |
+|------------------------------------------------|----------|----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| base-url | String | Yes | - | The URL of the JDBC connection. Refer to a case: `jdbc:postgresql://localhost:5432/postgres_cdc?loggerLevel=OFF`. |
+| username | String | Yes | - | Username of the database to use when connecting to the database server. |
+| password | String | Yes | - | Password to use when connecting to the database server. |
+| database-names | List | No | - | Database name of the database to monitor. |
+| table-names | List | Yes | - | Table name of the database to monitor. The table name needs to include the database name, for example: `database_name.table_name` |
+| table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}] |
+| startup.mode | Enum | No | INITIAL | Optional startup mode for Opengauss CDC consumer, valid enumerations are `initial`, `earliest`, `latest` and `specific`. `initial`: Synchronize historical data at startup, and then synchronize incremental data. `earliest`: Startup from the earliest offset possible. `latest`: Startup from the latest offset. `specific`: Startup from user-supplied specific offsets. |
+| snapshot.split.size | Integer | No | 8096 | The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table. |
+| snapshot.fetch.size | Integer | No | 1024 | The maximum fetch size for per poll when read table snapshot. |
+| slot.name | String | No | - | The name of the Opengauss logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring. Default is seatunnel. |
+| decoding.plugin.name | String | No | pgoutput | The name of the Postgres logical decoding plug-in installed on the server,Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,wal2json_rds_streaming and pgoutput. |
+| server-time-zone | String | No | UTC | The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone. |
+| connect.timeout.ms | Duration | No | 30000 | The maximum time that the connector should wait after trying to connect to the database server before timing out. |
+| connect.max-retries | Integer | No | 3 | The max retry times that the connector should retry to build database server connection. |
+| connection.pool.size | Integer | No | 20 | The jdbc connection pool size. |
+| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 | The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0. |
+| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05. |
+| sample-sharding.threshold | Integer | No | 1000 | This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. |
+| inverse-sampling.rate | Integer | No | 1000 | The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. |
+| exactly_once | Boolean | No | false | Enable exactly once semantic. |
+| format | Enum | No | DEFAULT | Optional output format for Opengauss CDC, valid enumerations are `DEFAULT`, `COMPATIBLE_DEBEZIUM_JSON`. |
+| debezium | Config | No | - | Pass-through [Debezium's properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/postgresql.adoc#connector-configuration-properties) to Debezium Embedded Engine which is used to capture data changes from Opengauss server. |
+| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |
+
+## Task Example
+
+### Simple
+
+> Support multi-table reading
+
+```
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ Opengauss-CDC {
+ result_table_name = "customers_opengauss_cdc"
+ username = "gaussdb"
+ password = "openGauss@123"
+ database-names = ["opengauss_cdc"]
+ schema-names = ["inventory"]
+ table-names = ["opengauss_cdc.inventory.opengauss_cdc_table_1","opengauss_cdc.inventory.opengauss_cdc_table_2"]
+ base-url = "jdbc:postgresql://opengauss_cdc_e2e:5432/opengauss_cdc"
+ decoding.plugin.name = "pgoutput"
+ }
+}
+
+transform {
+
+}
+
+sink {
+ jdbc {
+ source_table_name = "customers_opengauss_cdc"
+ url = "jdbc:postgresql://opengauss_cdc_e2e:5432/opengauss_cdc"
+ driver = "org.postgresql.Driver"
+ user = "dailai"
+ password = "openGauss@123"
+
+ compatible_mode="postgresLow"
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = "opengauss_cdc"
+ schema = "inventory"
+ tablePrefix = "sink_"
+ primary_keys = ["id"]
+ }
+}
+
+```
+
+### Support custom primary key for table
+
+```
+source {
+ Opengauss-CDC {
+ result_table_name = "customers_opengauss_cdc"
+ username = "gaussdb"
+ password = "openGauss@123"
+ database-names = ["opengauss_cdc"]
+ schema-names = ["inventory"]
+ table-names = ["opengauss_cdc.inventory.full_types_no_primary_key"]
+ base-url = "jdbc:postgresql://opengauss_cdc_e2e:5432/opengauss_cdc?loggerLevel=OFF"
+ decoding.plugin.name = "pgoutput"
+ exactly_once = true
+ table-names-config = [
+ {
+ table = "opengauss_cdc.inventory.full_types_no_primary_key"
+ primaryKeys = ["id"]
+ }
+ ]
+ }
+}
+```
+
diff --git a/docs/zh/connector-v2/source/Opengauss-CDC.md b/docs/zh/connector-v2/source/Opengauss-CDC.md
new file mode 100644
index 000000000000..092249bb63be
--- /dev/null
+++ b/docs/zh/connector-v2/source/Opengauss-CDC.md
@@ -0,0 +1,169 @@
+# Opengauss CDC
+
+> Opengauss CDC源连接器
+
+## 支持这些引擎
+
+> SeaTunnel Zeta
+> Flink
+
+## 主要功能
+
+- [ ] [批处理](../../concept/connector-v2-features.md)
+- [x] [流处理](../../concept/connector-v2-features.md)
+- [x] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [列投影](../../concept/connector-v2-features.md)
+- [x] [并行度](../../concept/connector-v2-features.md)
+- [x] [支持用户定义的拆分](../../concept/connector-v2-features.md)
+
+## 描述
+
+Opengauss CDC连接器允许从Opengauss数据库读取快照数据和增量数据。这个文档描述如何设置Opengauss CDC连接器以在Opengauss database中运行SQL查询。
+
+## 使用步骤
+
+> 这里是启用Opengauss CDC的步骤:
+
+1. 确保wal_level被设置为logical, 你可以直接使用SQL命令来修改这个配置:
+
+```sql
+ALTER SYSTEM SET wal_level TO 'logical';
+SELECT pg_reload_conf();
+```
+
+2. 改变指定表的REPLICA策略为FULL
+
+```sql
+ALTER TABLE your_table_name REPLICA IDENTITY FULL;
+```
+
+如果你有很多表,你可以使用下面SQL的结果集来改变所有表的REPLICA策略
+
+```sql
+select 'ALTER TABLE ' || schemaname || '.' || tablename || ' REPLICA IDENTITY FULL;' from pg_tables where schemaname = 'YourTableSchema'
+```
+
+## 数据类型映射
+
+| Opengauss Data type | SeaTunnel Data type |
+|-----------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL | BOOLEAN |
+| BYTEA | BYTES |
+| INT2 SMALLSERIAL INT4 SERIAL | INT |
+| INT8 BIGSERIAL | BIGINT |
+| FLOAT4 | FLOAT |
+| FLOAT8 | DOUBLE |
+| NUMERIC(Get the designated column's specified column size>0) | DECIMAL(Get the designated column's specified column size,Gets the number of digits in the specified column to the right of the decimal point) |
+| NUMERIC(Get the designated column's specified column size<0) | DECIMAL(38, 18) |
+| BPCHAR CHARACTER VARCHAR TEXT GEOMETRY GEOGRAPHY JSON JSONB | STRING |
+| TIMESTAMP | TIMESTAMP |
+| TIME | TIME |
+| DATE | DATE |
+| OTHER DATA TYPES | NOT SUPPORTED YET |
+
+## 源端可选项
+
+| Name | Type | Required | Default | Description |
+|------------------------------------------------|------|----------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| base-url | 字符串 | 是 | - | JDBC连接的URL. 参考: `jdbc:postgresql://localhost:5432/postgres_cdc?loggerLevel=OFF`. |
+| username | 字符串 | 是 | - | 连接数据库的用户名 |
+| password | 字符串 | 是 | - | 连接数据库的密码 |
+| database-names | 列表 | 否 | - | 监控的数据库名称 |
+| table-names | 列表 | 是 | - | 监控的数据表名称. 表名需要包含数据库名称, 例如: `database_name.table_name` |
+| table-names-config | 列表 | 否 | - | 表配置的列表集合. 例如: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}] |
+| startup.mode | 枚举 | 否 | INITIAL | Opengauss CDC消费者的可选启动模式, 有效的枚举是`initial`, `earliest`, `latest` and `specific`. `initial`: 启动时同步历史数据,然后同步增量数据 `earliest`: 从可能的最早偏移量启动 `latest`: 从最近的偏移量启动 `specific`: 从用户指定的偏移量启动 |
+| snapshot.split.size | 整型 | 否 | 8096 | 表快照的分割大小(行数),在读取表的快照时,捕获的表被分割成多个split |
+| snapshot.fetch.size | 整型 | 否 | 1024 | 读取表快照时,每次轮询的最大读取大小 |
+| slot.name | 字符串 | 否 | - | Opengauss逻辑解码插槽的名称,该插槽是为特定数据库/模式的特定插件的流式更改而创建的。服务器使用此插槽将事件流传输到正在配置的连接器。默认值为seatunnel |
+| decoding.plugin.name | 字符串 | 否 | pgoutput | 安装在服务器上的Postgres逻辑解码插件的名称,支持的值是decoderbufs、wal2json、wal2json_rds、wal2json_streaming、wal2json_rds_streaming和pgoutput |
+| server-time-zone | 字符串 | 否 | UTC | 数据库服务器中的会话时区。如果没有设置,则使用ZoneId.systemDefault()来确定服务器的时区 |
+| connect.timeout.ms | 时间间隔 | 否 | 30000 | 在尝试连接数据库服务器之后,连接器在超时之前应该等待的最大时间 |
+| connect.max-retries | 整型 | 否 | 3 | 连接器在建立数据库服务器连接时应该重试的最大次数 |
+| connection.pool.size | 整型 | 否 | 20 | jdbc连接池的大小 |
+| chunk-key.even-distribution.factor.upper-bound | 双浮点型 | 否 | 100 | chunk的key分布因子的上界。该因子用于确定表数据是否均匀分布。如果分布因子被计算为小于或等于这个上界(即(MAX(id) - MIN(id) + 1) /行数),表的所有chunk将被优化以达到均匀分布。否则,如果分布因子更大,则认为表分布不均匀,如果估计的分片数量超过`sample-sharding.threshold`指定的值,则将使用基于采样的分片策略。默认值为100.0。 |
+| chunk-key.even-distribution.factor.lower-bound | 双浮点型 | 否 | 0.05 | chunk的key分布因子的下界。该因子用于确定表数据是否均匀分布。如果分布因子的计算结果大于或等于这个下界(即(MAX(id) - MIN(id) + 1) /行数),那么表的所有块将被优化以达到均匀分布。否则,如果分布因子较小,则认为表分布不均匀,如果估计的分片数量超过`sample-sharding.threshold`指定的值,则使用基于采样的分片策略。缺省值为0.05。 |
+| sample-sharding.threshold | 整型 | 否 | 1000 | 此配置指定了用于触发采样分片策略的估计分片数的阈值。当分布因子超出了由`chunk-key.even-distribution.factor.upper-bound `和`chunk-key.even-distribution.factor.lower-bound`,并且估计的分片计数(以近似的行数/块大小计算)超过此阈值,则将使用样本分片策略。这有助于更有效地处理大型数据集。默认值为1000个分片。 |
+| inverse-sampling.rate | 整型 | 否 | 1000 | 采样分片策略中使用的采样率的倒数。例如,如果该值设置为1000,则意味着在采样过程中应用了1/1000的采样率。该选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。当处理非常大的数据集时,它特别有用,其中首选较低的采样率。缺省值为1000。 |
+| exactly_once | 布尔 | 否 | false | 启用exactly once语义 |
+| format | 枚举 | 否 | DEFAULT | Opengauss CDC可选的输出格式, 有效的枚举是`DEFAULT`, `COMPATIBLE_DEBEZIUM_JSON`. |
+| debezium | 配置 | 否 | - | 将 [Debezium的属性](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/postgresql.adoc#connector-configuration-properties) 传递到Debezium嵌入式引擎,该引擎用于捕获来自Opengauss服务的数据更改 |
+| common-options | | 否 | - | 源码插件通用参数, 请参考[Source Common Options](common-options.md)获取详情 |
+
+## 任务示例
+
+### 简单
+
+> 支持多表读
+
+```
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ Opengauss-CDC {
+ result_table_name = "customers_opengauss_cdc"
+ username = "gaussdb"
+ password = "openGauss@123"
+ database-names = ["opengauss_cdc"]
+ schema-names = ["inventory"]
+ table-names = ["opengauss_cdc.inventory.opengauss_cdc_table_1","opengauss_cdc.inventory.opengauss_cdc_table_2"]
+ base-url = "jdbc:postgresql://opengauss_cdc_e2e:5432/opengauss_cdc"
+ decoding.plugin.name = "pgoutput"
+ }
+}
+
+transform {
+
+}
+
+sink {
+ jdbc {
+ source_table_name = "customers_opengauss_cdc"
+ url = "jdbc:postgresql://opengauss_cdc_e2e:5432/opengauss_cdc"
+ driver = "org.postgresql.Driver"
+ user = "dailai"
+ password = "openGauss@123"
+
+ compatible_mode="postgresLow"
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = "opengauss_cdc"
+ schema = "inventory"
+ tablePrefix = "sink_"
+ primary_keys = ["id"]
+ }
+}
+
+```
+
+### 支持自定义主键
+
+```
+source {
+ Opengauss-CDC {
+ result_table_name = "customers_opengauss_cdc"
+ username = "gaussdb"
+ password = "openGauss@123"
+ database-names = ["opengauss_cdc"]
+ schema-names = ["inventory"]
+ table-names = ["opengauss_cdc.inventory.full_types_no_primary_key"]
+ base-url = "jdbc:postgresql://opengauss_cdc_e2e:5432/opengauss_cdc?loggerLevel=OFF"
+ decoding.plugin.name = "pgoutput"
+ exactly_once = true
+ table-names-config = [
+ {
+ table = "opengauss_cdc.inventory.full_types_no_primary_key"
+ primaryKeys = ["id"]
+ }
+ ]
+ }
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index a74b9e1223ef..d77b70e5e841 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -131,6 +131,7 @@ seatunnel.source.Milvus = connector-milvus
seatunnel.sink.Milvus = connector-milvus
seatunnel.sink.ActiveMQ = connector-activemq
seatunnel.source.Sls = connector-sls
+seatunnel.source.Opengauss-CDC = connector-cdc-opengauss
seatunnel.transform.Sql = seatunnel-transforms-v2
seatunnel.transform.FieldMapper = seatunnel-transforms-v2
@@ -142,3 +143,4 @@ seatunnel.transform.Split = seatunnel-transforms-v2
seatunnel.transform.Copy = seatunnel-transforms-v2
seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
+
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/pom.xml b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/pom.xml
new file mode 100644
index 000000000000..098c60370d41
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/pom.xml
@@ -0,0 +1,91 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ connector-cdc
+ ${revision}
+
+
+ connector-cdc-opengauss
+ SeaTunnel : Connectors V2 : CDC : Opengauss
+
+
+ 5.1.0
+
+
+
+
+ org.opengauss
+ opengauss-jdbc
+ ${opengauss.version}
+
+
+
+ org.apache.seatunnel
+ connector-cdc-postgres
+ ${project.version}
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+ shade
+
+ package
+
+ false
+ true
+ false
+ false
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+ org.postgresql
+ ${seatunnel.shade.package}.org.postgresql
+
+
+
+
+
+
+
+
+
+
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
new file mode 100644
index 000000000000..57c393acfaf9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import org.postgresql.core.BaseConnection;
+import org.postgresql.jdbc.PgConnection;
+import org.postgresql.jdbc.TimestampUtils;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.util.PGmoney;
+import org.postgresql.util.PSQLState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.DebeziumException;
+import io.debezium.annotation.VisibleForTesting;
+import io.debezium.config.Configuration;
+import io.debezium.connector.postgresql.PgOid;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.PostgresType;
+import io.debezium.connector.postgresql.PostgresValueConverter;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.schema.DatabaseSchema;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+
+import java.nio.charset.Charset;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Copied from Debezium 1.9.8.Final. {@link JdbcConnection} connection extension used for connecting
+ * to Postgres instances.
+ *
+ *
Line 616 : skip validateServerVersion because the version based pg of opengauss is below 9.4
+ */
+public class PostgresConnection extends JdbcConnection {
+
+ public static final String CONNECTION_STREAMING = "Debezium Streaming";
+ public static final String CONNECTION_SLOT_INFO = "Debezium Slot Info";
+ public static final String CONNECTION_DROP_SLOT = "Debezium Drop Slot";
+ public static final String CONNECTION_VALIDATE_CONNECTION = "Debezium Validate Connection";
+ public static final String CONNECTION_HEARTBEAT = "Debezium Heartbeat";
+ public static final String CONNECTION_GENERAL = "Debezium General";
+
+ private static Logger LOGGER = LoggerFactory.getLogger(PostgresConnection.class);
+
+ private static final String URL_PATTERN =
+ "jdbc:postgresql://${"
+ + JdbcConfiguration.HOSTNAME
+ + "}:${"
+ + JdbcConfiguration.PORT
+ + "}/${"
+ + JdbcConfiguration.DATABASE
+ + "}";
+ protected static final ConnectionFactory FACTORY =
+ JdbcConnection.patternBasedFactory(
+ URL_PATTERN,
+ org.postgresql.Driver.class.getName(),
+ PostgresConnection.class.getClassLoader(),
+ JdbcConfiguration.PORT.withDefault(
+ PostgresConnectorConfig.PORT.defaultValueAsString()));
+
+ /**
+ * Obtaining a replication slot may fail if there's a pending transaction. We're retrying to get
+ * a slot for 30 min.
+ */
+ private static final int MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT = 900;
+
+ private static final Duration PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS =
+ Duration.ofSeconds(2);
+
+ private final TypeRegistry typeRegistry;
+ private final PostgresDefaultValueConverter defaultValueConverter;
+
+ /**
+ * Creates a Postgres connection using the supplied configuration. If necessary this connection
+ * is able to resolve data type mappings. Such a connection requires a {@link
+ * PostgresValueConverter}, and will provide its own {@link TypeRegistry}. Usually only one such
+ * connection per connector is needed.
+ *
+ * @param config {@link Configuration} instance, may not be null.
+ * @param valueConverterBuilder supplies a configured {@link PostgresValueConverter} for a given
+ * {@link TypeRegistry}
+ * @param connectionUsage a symbolic name of the connection to be tracked in monitoring tools
+ */
+ public PostgresConnection(
+ JdbcConfiguration config,
+ PostgresValueConverterBuilder valueConverterBuilder,
+ String connectionUsage) {
+ super(
+ addDefaultSettings(config, connectionUsage),
+ FACTORY,
+ PostgresConnection::validateServerVersion,
+ null,
+ "\"",
+ "\"");
+
+ if (Objects.isNull(valueConverterBuilder)) {
+ this.typeRegistry = null;
+ this.defaultValueConverter = null;
+ } else {
+ this.typeRegistry = new TypeRegistry(this);
+
+ final PostgresValueConverter valueConverter =
+ valueConverterBuilder.build(this.typeRegistry);
+ this.defaultValueConverter =
+ new PostgresDefaultValueConverter(valueConverter, this.getTimestampUtils());
+ }
+ }
+
+ /**
+ * Create a Postgres connection using the supplied configuration and {@link TypeRegistry}
+ *
+ * @param config {@link Configuration} instance, may not be null.
+ * @param typeRegistry an existing/already-primed {@link TypeRegistry} instance
+ * @param connectionUsage a symbolic name of the connection to be tracked in monitoring tools
+ */
+ public PostgresConnection(
+ PostgresConnectorConfig config, TypeRegistry typeRegistry, String connectionUsage) {
+ super(
+ addDefaultSettings(config.getJdbcConfig(), connectionUsage),
+ FACTORY,
+ PostgresConnection::validateServerVersion,
+ null,
+ "\"",
+ "\"");
+ if (Objects.isNull(typeRegistry)) {
+ this.typeRegistry = null;
+ this.defaultValueConverter = null;
+ } else {
+ this.typeRegistry = typeRegistry;
+ final PostgresValueConverter valueConverter =
+ PostgresValueConverter.of(config, this.getDatabaseCharset(), typeRegistry);
+ this.defaultValueConverter =
+ new PostgresDefaultValueConverter(valueConverter, this.getTimestampUtils());
+ }
+ }
+
+ /**
+ * Creates a Postgres connection using the supplied configuration. The connector is the regular
+ * one without datatype resolution capabilities.
+ *
+ * @param config {@link Configuration} instance, may not be null.
+ * @param connectionUsage a symbolic name of the connection to be tracked in monitoring tools
+ */
+ public PostgresConnection(JdbcConfiguration config, String connectionUsage) {
+ this(config, null, connectionUsage);
+ }
+
+ static JdbcConfiguration addDefaultSettings(
+ JdbcConfiguration configuration, String connectionUsage) {
+ // we require Postgres 9.4 as the minimum server version since that's where logical
+ // replication was first introduced
+ return JdbcConfiguration.adapt(
+ configuration
+ .edit()
+ .with("assumeMinServerVersion", "9.4")
+ .with("ApplicationName", connectionUsage)
+ .build());
+ }
+
+ /**
+ * Returns a JDBC connection string for the current configuration.
+ *
+ * @return a {@code String} where the variables in {@code urlPattern} are replaced with values
+ * from the configuration
+ */
+ public String connectionString() {
+ return connectionString(URL_PATTERN);
+ }
+
+ /**
+ * Prints out information about the REPLICA IDENTITY status of a table. This in turn determines
+ * how much information is available for UPDATE and DELETE operations for logical replication.
+ *
+ * @param tableId the identifier of the table
+ * @return the replica identity information; never null
+ * @throws SQLException if there is a problem obtaining the replica identity information for the
+ * given table
+ */
+ public ServerInfo.ReplicaIdentity readReplicaIdentityInfo(TableId tableId) throws SQLException {
+ String statement =
+ "SELECT relreplident FROM pg_catalog.pg_class c "
+ + "LEFT JOIN pg_catalog.pg_namespace n ON c.relnamespace=n.oid "
+ + "WHERE n.nspname=? and c.relname=?";
+ String schema =
+ tableId.schema() != null && tableId.schema().length() > 0
+ ? tableId.schema()
+ : "public";
+ StringBuilder replIdentity = new StringBuilder();
+ prepareQuery(
+ statement,
+ stmt -> {
+ stmt.setString(1, schema);
+ stmt.setString(2, tableId.table());
+ },
+ rs -> {
+ if (rs.next()) {
+ replIdentity.append(rs.getString(1));
+ } else {
+ LOGGER.warn(
+ "Cannot determine REPLICA IDENTITY information for table '{}'",
+ tableId);
+ }
+ });
+ return ServerInfo.ReplicaIdentity.parseFromDB(replIdentity.toString());
+ }
+
+ /**
+ * Returns the current state of the replication slot
+ *
+ * @param slotName the name of the slot
+ * @param pluginName the name of the plugin used for the desired slot
+ * @return the {@link SlotState} or null, if no slot state is found
+ * @throws SQLException
+ */
+ public SlotState getReplicationSlotState(String slotName, String pluginName)
+ throws SQLException {
+ ServerInfo.ReplicationSlot slot;
+ try {
+ slot = readReplicationSlotInfo(slotName, pluginName);
+ if (slot.equals(ServerInfo.ReplicationSlot.INVALID)) {
+ return null;
+ } else {
+ return slot.asSlotState();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ConnectException(
+ "Interrupted while waiting for valid replication slot info", e);
+ }
+ }
+
+ /**
+ * Fetches the state of a replication stage given a slot name and plugin name
+ *
+ * @param slotName the name of the slot
+ * @param pluginName the name of the plugin used for the desired slot
+ * @return the {@link ServerInfo.ReplicationSlot} object or a {@link
+ * ServerInfo.ReplicationSlot#INVALID} if the slot is not valid
+ * @throws SQLException is thrown by the underlying JDBC
+ */
+ private ServerInfo.ReplicationSlot fetchReplicationSlotInfo(String slotName, String pluginName)
+ throws SQLException {
+ final String database = database();
+ final ServerInfo.ReplicationSlot slot =
+ queryForSlot(
+ slotName,
+ database,
+ pluginName,
+ rs -> {
+ if (rs.next()) {
+ boolean active = rs.getBoolean("active");
+ final Lsn confirmedFlushedLsn =
+ parseConfirmedFlushLsn(slotName, pluginName, database, rs);
+ if (confirmedFlushedLsn == null) {
+ return null;
+ }
+ Lsn restartLsn =
+ parseRestartLsn(slotName, pluginName, database, rs);
+ if (restartLsn == null) {
+ return null;
+ }
+ final Long xmin = rs.getLong("catalog_xmin");
+ return new ServerInfo.ReplicationSlot(
+ active, confirmedFlushedLsn, restartLsn, xmin);
+ } else {
+ LOGGER.debug(
+ "No replication slot '{}' is present for plugin '{}' and database '{}'",
+ slotName,
+ pluginName,
+ database);
+ return ServerInfo.ReplicationSlot.INVALID;
+ }
+ });
+ return slot;
+ }
+
+ /**
+ * Fetches a replication slot, repeating the query until either the slot is created or until the
+ * max number of attempts has been reached
+ *
+ *
To fetch the slot without the retries, use the {@link
+ * PostgresConnection#fetchReplicationSlotInfo} call
+ *
+ * @param slotName the slot name
+ * @param pluginName the name of the plugin
+ * @return the {@link ServerInfo.ReplicationSlot} object or a {@link
+ * ServerInfo.ReplicationSlot#INVALID} if the slot is not valid
+ * @throws SQLException is thrown by the underyling jdbc driver
+ * @throws InterruptedException is thrown if we don't return an answer within the set number of
+ * retries
+ */
+ @VisibleForTesting
+ ServerInfo.ReplicationSlot readReplicationSlotInfo(String slotName, String pluginName)
+ throws SQLException, InterruptedException {
+ final String database = database();
+ final Metronome metronome =
+ Metronome.parker(PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS, Clock.SYSTEM);
+
+ for (int attempt = 1; attempt <= MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT; attempt++) {
+ final ServerInfo.ReplicationSlot slot = fetchReplicationSlotInfo(slotName, pluginName);
+ if (slot != null) {
+ LOGGER.info("Obtained valid replication slot {}", slot);
+ return slot;
+ }
+ LOGGER.warn(
+ "Cannot obtain valid replication slot '{}' for plugin '{}' and database '{}' [during attempt {} out of {}, concurrent tx probably blocks taking snapshot.",
+ slotName,
+ pluginName,
+ database,
+ attempt,
+ MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT);
+ metronome.pause();
+ }
+
+ throw new ConnectException(
+ "Unable to obtain valid replication slot. "
+ + "Make sure there are no long-running transactions running in parallel as they may hinder the allocation of the replication slot when starting this connector");
+ }
+
+ protected ServerInfo.ReplicationSlot queryForSlot(
+ String slotName,
+ String database,
+ String pluginName,
+ ResultSetMapper map)
+ throws SQLException {
+ return prepareQueryAndMap(
+ "select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?",
+ statement -> {
+ statement.setString(1, slotName);
+ statement.setString(2, database);
+ statement.setString(3, pluginName);
+ },
+ map);
+ }
+
+ /**
+ * Obtains the LSN to resume streaming from. On PG 9.5 there is no confirmed_flushed_lsn yet, so
+ * restart_lsn will be read instead. This may result in more records to be re-read after a
+ * restart.
+ */
+ private Lsn parseConfirmedFlushLsn(
+ String slotName, String pluginName, String database, ResultSet rs) {
+ Lsn confirmedFlushedLsn = null;
+
+ try {
+ confirmedFlushedLsn =
+ tryParseLsn(slotName, pluginName, database, rs, "confirmed_flush_lsn");
+ } catch (SQLException e) {
+ LOGGER.info("unable to find confirmed_flushed_lsn, falling back to restart_lsn");
+ try {
+ confirmedFlushedLsn =
+ tryParseLsn(slotName, pluginName, database, rs, "restart_lsn");
+ } catch (SQLException e2) {
+ throw new ConnectException(
+ "Neither confirmed_flush_lsn nor restart_lsn could be found");
+ }
+ }
+
+ return confirmedFlushedLsn;
+ }
+
+ private Lsn parseRestartLsn(String slotName, String pluginName, String database, ResultSet rs) {
+ Lsn restartLsn = null;
+ try {
+ restartLsn = tryParseLsn(slotName, pluginName, database, rs, "restart_lsn");
+ } catch (SQLException e) {
+ throw new ConnectException("restart_lsn could be found");
+ }
+
+ return restartLsn;
+ }
+
+ private Lsn tryParseLsn(
+ String slotName, String pluginName, String database, ResultSet rs, String column)
+ throws ConnectException, SQLException {
+ Lsn lsn = null;
+
+ String lsnStr = rs.getString(column);
+ if (lsnStr == null) {
+ return null;
+ }
+ try {
+ lsn = Lsn.valueOf(lsnStr);
+ } catch (Exception e) {
+ throw new ConnectException(
+ "Value "
+ + column
+ + " in the pg_replication_slots table for slot = '"
+ + slotName
+ + "', plugin = '"
+ + pluginName
+ + "', database = '"
+ + database
+ + "' is not valid. This is an abnormal situation and the database status should be checked.");
+ }
+ if (!lsn.isValid()) {
+ throw new ConnectException("Invalid LSN returned from database");
+ }
+ return lsn;
+ }
+
+ /**
+ * Drops a replication slot that was created on the DB
+ *
+ * @param slotName the name of the replication slot, may not be null
+ * @return {@code true} if the slot was dropped, {@code false} otherwise
+ */
+ public boolean dropReplicationSlot(String slotName) {
+ final int ATTEMPTS = 3;
+ for (int i = 0; i < ATTEMPTS; i++) {
+ try {
+ execute("select pg_drop_replication_slot('" + slotName + "')");
+ return true;
+ } catch (SQLException e) {
+ // slot is active
+ if (PSQLState.OBJECT_IN_USE.getState().equals(e.getSQLState())) {
+ if (i < ATTEMPTS - 1) {
+ LOGGER.debug(
+ "Cannot drop replication slot '{}' because it's still in use",
+ slotName);
+ } else {
+ LOGGER.warn(
+ "Cannot drop replication slot '{}' because it's still in use",
+ slotName);
+ return false;
+ }
+ } else if (PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) {
+ LOGGER.debug("Replication slot {} has already been dropped", slotName);
+ return false;
+ } else {
+ LOGGER.error("Unexpected error while attempting to drop replication slot", e);
+ return false;
+ }
+ }
+ try {
+ Metronome.parker(Duration.ofSeconds(1), Clock.system()).pause();
+ } catch (InterruptedException e) {
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Drops the debezium publication that was created.
+ *
+ * @param publicationName the publication name, may not be null
+ * @return {@code true} if the publication was dropped, {@code false} otherwise
+ */
+ public boolean dropPublication(String publicationName) {
+ try {
+ LOGGER.debug("Dropping publication '{}'", publicationName);
+ execute("DROP PUBLICATION " + publicationName);
+ return true;
+ } catch (SQLException e) {
+ if (PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) {
+ LOGGER.debug("Publication {} has already been dropped", publicationName);
+ } else {
+ LOGGER.error("Unexpected error while attempting to drop publication", e);
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ try {
+ super.close();
+ } catch (SQLException e) {
+ LOGGER.error("Unexpected error while closing Postgres connection", e);
+ }
+ }
+
+ /**
+ * Returns the PG id of the current active transaction
+ *
+ * @return a PG transaction identifier, or null if no tx is active
+ * @throws SQLException if anything fails.
+ */
+ public Long currentTransactionId() throws SQLException {
+ AtomicLong txId = new AtomicLong(0);
+ query(
+ "select (case pg_is_in_recovery() when 't' then 0 else txid_current() end) AS pg_current_txid",
+ rs -> {
+ if (rs.next()) {
+ txId.compareAndSet(0, rs.getLong(1));
+ }
+ });
+ long value = txId.get();
+ return value > 0 ? value : null;
+ }
+
+ /**
+ * Returns the current position in the server tx log.
+ *
+ * @return a long value, never negative
+ * @throws SQLException if anything unexpected fails.
+ */
+ public long currentXLogLocation() throws SQLException {
+ AtomicLong result = new AtomicLong(0);
+ int majorVersion = connection().getMetaData().getDatabaseMajorVersion();
+ query(
+ majorVersion >= 10
+ ? "select (case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) AS pg_current_wal_lsn"
+ : "select * from pg_current_xlog_location()",
+ rs -> {
+ if (!rs.next()) {
+ throw new IllegalStateException(
+ "there should always be a valid xlog position");
+ }
+ result.compareAndSet(0, LogSequenceNumber.valueOf(rs.getString(1)).asLong());
+ });
+ return result.get();
+ }
+
+ /**
+ * Returns information about the PG server to which this instance is connected.
+ *
+ * @return a {@link ServerInfo} instance, never {@code null}
+ * @throws SQLException if anything fails
+ */
+ public ServerInfo serverInfo() throws SQLException {
+ ServerInfo serverInfo = new ServerInfo();
+ query(
+ "SELECT version(), current_user, current_database()",
+ rs -> {
+ if (rs.next()) {
+ serverInfo
+ .withServer(rs.getString(1))
+ .withUsername(rs.getString(2))
+ .withDatabase(rs.getString(3));
+ }
+ });
+ String username = serverInfo.username();
+ if (username != null) {
+ query(
+ "SELECT oid, rolname, rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin, rolreplication FROM pg_roles "
+ + "WHERE pg_has_role('"
+ + username
+ + "', oid, 'member')",
+ rs -> {
+ while (rs.next()) {
+ String roleInfo =
+ "superuser: "
+ + rs.getBoolean(3)
+ + ", replication: "
+ + rs.getBoolean(8)
+ + ", inherit: "
+ + rs.getBoolean(4)
+ + ", create role: "
+ + rs.getBoolean(5)
+ + ", create db: "
+ + rs.getBoolean(6)
+ + ", can log in: "
+ + rs.getBoolean(7);
+ String roleName = rs.getString(2);
+ serverInfo.addRole(roleName, roleInfo);
+ }
+ });
+ }
+ return serverInfo;
+ }
+
+ public Charset getDatabaseCharset() {
+ try {
+ return Charset.forName(((BaseConnection) connection()).getEncoding().name());
+ } catch (SQLException e) {
+ throw new DebeziumException("Couldn't obtain encoding for database " + database(), e);
+ }
+ }
+
+ public TimestampUtils getTimestampUtils() {
+ try {
+ return ((PgConnection) this.connection()).getTimestampUtils();
+ } catch (SQLException e) {
+ throw new DebeziumException(
+ "Couldn't get timestamp utils from underlying connection", e);
+ }
+ }
+
+ private static void validateServerVersion(Statement statement) throws SQLException {}
+
+ @Override
+ public String quotedColumnIdString(String columnName) {
+ if (columnName.contains("\"")) {
+ columnName = columnName.replaceAll("\"", "\"\"");
+ }
+
+ return super.quotedColumnIdString(columnName);
+ }
+
+ @Override
+ protected int resolveNativeType(String typeName) {
+ return getTypeRegistry().get(typeName).getRootType().getOid();
+ }
+
+ @Override
+ protected int resolveJdbcType(int metadataJdbcType, int nativeType) {
+ // Special care needs to be taken for columns that use user-defined domain type data types
+ // where resolution of the column's JDBC type needs to be that of the root type instead of
+ // the actual column to properly influence schema building and value conversion.
+ return getTypeRegistry().get(nativeType).getRootType().getJdbcId();
+ }
+
+ @Override
+ protected Optional readTableColumn(
+ ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter)
+ throws SQLException {
+ return doReadTableColumn(columnMetadata, tableId, columnFilter);
+ }
+
+ public Optional readColumnForDecoder(
+ ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnNameFilter)
+ throws SQLException {
+ return doReadTableColumn(columnMetadata, tableId, columnNameFilter)
+ .map(ColumnEditor::create);
+ }
+
+ private Optional doReadTableColumn(
+ ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter)
+ throws SQLException {
+ final String columnName = columnMetadata.getString(4);
+ if (columnFilter == null
+ || columnFilter.matches(
+ tableId.catalog(), tableId.schema(), tableId.table(), columnName)) {
+ final ColumnEditor column = Column.editor().name(columnName);
+ column.type(columnMetadata.getString(6));
+
+ // first source the length/scale from the column metadata provided by the driver
+ // this may be overridden below if the column type is a user-defined domain type
+ column.length(columnMetadata.getInt(7));
+ if (columnMetadata.getObject(9) != null) {
+ column.scale(columnMetadata.getInt(9));
+ }
+
+ column.optional(isNullable(columnMetadata.getInt(11)));
+ column.position(columnMetadata.getInt(17));
+ column.autoIncremented("YES".equalsIgnoreCase(columnMetadata.getString(23)));
+
+ String autogenerated = null;
+ try {
+ autogenerated = columnMetadata.getString(24);
+ } catch (SQLException e) {
+ // ignore, some drivers don't have this index - e.g. Postgres
+ }
+ column.generated("YES".equalsIgnoreCase(autogenerated));
+
+ // Lookup the column type from the TypeRegistry
+ // For all types, we need to set the Native and Jdbc types by using the root-type
+ final PostgresType nativeType = getTypeRegistry().get(column.typeName());
+ column.nativeType(nativeType.getRootType().getOid());
+ column.jdbcType(nativeType.getRootType().getJdbcId());
+
+ // For domain types, the postgres driver is unable to traverse a nested unbounded
+ // hierarchy of types and report the right length/scale of a given type. We use
+ // the TypeRegistry to accomplish this since it is capable of traversing the type
+ // hierarchy upward to resolve length/scale regardless of hierarchy depth.
+ if (TypeRegistry.DOMAIN_TYPE == nativeType.getJdbcId()) {
+ column.length(nativeType.getDefaultLength());
+ column.scale(nativeType.getDefaultScale());
+ }
+
+ final String defaultValueExpression = columnMetadata.getString(13);
+ if (defaultValueExpression != null
+ && getDefaultValueConverter().supportConversion(column.typeName())) {
+ column.defaultValueExpression(defaultValueExpression);
+ }
+
+ return Optional.of(column);
+ }
+
+ return Optional.empty();
+ }
+
+ public PostgresDefaultValueConverter getDefaultValueConverter() {
+ Objects.requireNonNull(
+ defaultValueConverter, "Connection does not provide default value converter");
+ return defaultValueConverter;
+ }
+
+ public TypeRegistry getTypeRegistry() {
+ Objects.requireNonNull(typeRegistry, "Connection does not provide type registry");
+ return typeRegistry;
+ }
+
+ @Override
+ public > Object getColumnValue(
+ ResultSet rs, int columnIndex, Column column, Table table, T schema)
+ throws SQLException {
+ try {
+ final ResultSetMetaData metaData = rs.getMetaData();
+ final String columnTypeName = metaData.getColumnTypeName(columnIndex);
+ final PostgresType type =
+ ((PostgresSchema) schema).getTypeRegistry().get(columnTypeName);
+
+ LOGGER.trace("Type of incoming data is: {}", type.getOid());
+ LOGGER.trace("ColumnTypeName is: {}", columnTypeName);
+ LOGGER.trace("Type is: {}", type);
+
+ if (type.isArrayType()) {
+ return rs.getArray(columnIndex);
+ }
+
+ switch (type.getOid()) {
+ case PgOid.MONEY:
+ // TODO author=Horia Chiorean date=14/11/2016 description=workaround for
+ // https://github.com/pgjdbc/pgjdbc/issues/100
+ final String sMoney = rs.getString(columnIndex);
+ if (sMoney == null) {
+ return sMoney;
+ }
+ if (sMoney.startsWith("-")) {
+ // PGmoney expects negative values to be provided in the format of
+ // "($XXXXX.YY)"
+ final String negativeMoney = "(" + sMoney.substring(1) + ")";
+ return new PGmoney(negativeMoney).val;
+ }
+ return new PGmoney(sMoney).val;
+ case PgOid.BIT:
+ return rs.getString(columnIndex);
+ case PgOid.NUMERIC:
+ final String s = rs.getString(columnIndex);
+ if (s == null) {
+ return s;
+ }
+
+ Optional value = PostgresValueConverter.toSpecialValue(s);
+ return value.isPresent()
+ ? value.get()
+ : new SpecialValueDecimal(rs.getBigDecimal(columnIndex));
+ case PgOid.TIME:
+ // To handle time 24:00:00 supported by TIME columns, read the column as a
+ // string.
+ case PgOid.TIMETZ:
+ // In order to guarantee that we resolve TIMETZ columns with proper microsecond
+ // precision,
+ // read the column as a string instead and then re-parse inside the converter.
+ return rs.getString(columnIndex);
+ default:
+ Object x = rs.getObject(columnIndex);
+ if (x != null) {
+ LOGGER.trace(
+ "rs getobject returns class: {}; rs getObject value is: {}",
+ x.getClass(),
+ x);
+ }
+ return x;
+ }
+ } catch (SQLException e) {
+ // not a known type
+ return super.getColumnValue(rs, columnIndex, column, table, schema);
+ }
+ }
+
+ @Override
+ protected String[] supportedTableTypes() {
+ return new String[] {"VIEW", "MATERIALIZED VIEW", "TABLE", "PARTITIONED TABLE"};
+ }
+
+ @Override
+ protected boolean isTableType(String tableType) {
+ return "TABLE".equals(tableType) || "PARTITIONED TABLE".equals(tableType);
+ }
+
+ /**
+ * Retrieves all {@code TableId}s in a given database catalog, including partitioned tables.
+ *
+ * @param catalogName the catalog/database name
+ * @return set of all table ids for existing table objects
+ * @throws SQLException if a database exception occurred
+ */
+ public Set getAllTableIds(String catalogName) throws SQLException {
+ return readTableNames(catalogName, null, null, new String[] {"TABLE", "PARTITIONED TABLE"});
+ }
+
+ @FunctionalInterface
+ public interface PostgresValueConverterBuilder {
+ PostgresValueConverter build(TypeRegistry registry);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
new file mode 100644
index 000000000000..c69a63c7136a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
@@ -0,0 +1,928 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
+import org.postgresql.replication.PGReplicationStream;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.postgresql.util.PSQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotCreationResult;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcConnectionException;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+
+import java.nio.ByteBuffer;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * Copied from Debezium 1.9.8.Final. Implementation of a {@link ReplicationConnection} for
+ * Postgresql. Note that replication connections in PG cannot execute regular statements but only a
+ * limited number of replication-related commands.
+ *
+ *
Line 179 : Modify the method named initPublication which we use the regular - i.e. not a
+ * replication - connection to avoid the I/O error
+ *
+ *
Line 440: Modify the method named createReplicationSlot which add logical that create the slot
+ * if it doesn't exist
+ */
+public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection {
+
+ private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
+
+ private final String slotName;
+ private final String publicationName;
+ private final RelationalTableFilters tableFilter;
+ private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode;
+ private final PostgresConnectorConfig.LogicalDecoder plugin;
+ private final boolean dropSlotOnClose;
+ private final PostgresConnectorConfig connectorConfig;
+ private final Duration statusUpdateInterval;
+ private final MessageDecoder messageDecoder;
+ private final PostgresConnection jdbcConnection;
+ private final TypeRegistry typeRegistry;
+ private final Properties streamParams;
+
+ private Lsn defaultStartingPos;
+ private SlotCreationResult slotCreationInfo;
+ private boolean hasInitedSlot;
+
+ /**
+ * Creates a new replication connection with the given params.
+ *
+ * @param config the JDBC configuration for the connection; may not be null
+ * @param slotName the name of the DB slot for logical replication; may not be null
+ * @param publicationName the name of the DB publication for logical replication; may not be
+ * null
+ * @param tableFilter the tables to watch of the DB publication for logical replication; may not
+ * be null
+ * @param publicationAutocreateMode the mode for publication autocreation; may not be null
+ * @param plugin decoder matching the server side plug-in used for streaming changes; may not be
+ * null
+ * @param dropSlotOnClose whether the replication slot should be dropped once the connection is
+ * closed
+ * @param statusUpdateInterval the interval at which the replication connection should
+ * periodically send status
+ * @param doSnapshot whether the connector is doing snapshot
+ * @param jdbcConnection general PostgreSQL JDBC connection
+ * @param typeRegistry registry with PostgreSQL types
+ * @param streamParams additional parameters to pass to the replication stream
+ * @param schema the schema; must not be null
+ *
updates to the server
+ */
+ private PostgresReplicationConnection(
+ PostgresConnectorConfig config,
+ String slotName,
+ String publicationName,
+ RelationalTableFilters tableFilter,
+ PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
+ PostgresConnectorConfig.LogicalDecoder plugin,
+ boolean dropSlotOnClose,
+ boolean doSnapshot,
+ Duration statusUpdateInterval,
+ PostgresConnection jdbcConnection,
+ TypeRegistry typeRegistry,
+ Properties streamParams,
+ PostgresSchema schema) {
+ super(
+ addDefaultSettings(config.getJdbcConfig()),
+ PostgresConnection.FACTORY,
+ null,
+ null,
+ "\"",
+ "\"");
+
+ this.connectorConfig = config;
+ this.slotName = slotName;
+ this.publicationName = publicationName;
+ this.tableFilter = tableFilter;
+ this.publicationAutocreateMode = publicationAutocreateMode;
+ this.plugin = plugin;
+ this.dropSlotOnClose = dropSlotOnClose;
+ this.statusUpdateInterval = statusUpdateInterval;
+ this.messageDecoder =
+ plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection);
+ this.jdbcConnection = jdbcConnection;
+ this.typeRegistry = typeRegistry;
+ this.streamParams = streamParams;
+ this.slotCreationInfo = null;
+ this.hasInitedSlot = false;
+ }
+
+ private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) {
+ // first copy the parent's default settings...
+ // then set some additional replication specific settings
+ return JdbcConfiguration.adapt(
+ PostgresConnection.addDefaultSettings(
+ configuration, PostgresConnection.CONNECTION_STREAMING)
+ .edit()
+ .with("replication", "database")
+ .with(
+ "preferQueryMode",
+ "simple") // replication protocol only supports simple query mode
+ .build());
+ }
+
+ private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException {
+ try (PostgresConnection connection =
+ new PostgresConnection(
+ connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO)) {
+ return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName());
+ }
+ }
+
+ protected void initPublication() {
+ String tableFilterString = null;
+ if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
+ LOGGER.info("Initializing PgOutput logical decoder publication");
+ try {
+ PostgresConnection conn = jdbcConnection;
+ // Unless the autocommit is disabled the SELECT publication query will stay running
+ conn.setAutoCommit(false);
+
+ String selectPublication =
+ String.format(
+ "SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'",
+ publicationName);
+ conn.query(
+ selectPublication,
+ rs -> {
+ if (rs.next()) {
+ Long count = rs.getLong(1);
+ // Close eagerly as the transaction might stay running
+ if (count == 0L) {
+ LOGGER.info(
+ "Creating new publication '{}' for plugin '{}'",
+ publicationName,
+ plugin);
+ switch (publicationAutocreateMode) {
+ case DISABLED:
+ throw new ConnectException(
+ "Publication autocreation is disabled, please create one and restart the connector.");
+ case ALL_TABLES:
+ String createPublicationStmt =
+ String.format(
+ "CREATE PUBLICATION %s FOR ALL TABLES;",
+ publicationName);
+ LOGGER.info(
+ "Creating Publication with statement '{}'",
+ createPublicationStmt);
+ // Publication doesn't exist, create it.
+ conn.executeWithoutCommitting(createPublicationStmt);
+ break;
+ case FILTERED:
+ createOrUpdatePublicationModeFilterted(
+ tableFilterString, conn, false);
+ break;
+ }
+ } else {
+ switch (publicationAutocreateMode) {
+ case FILTERED:
+ createOrUpdatePublicationModeFilterted(
+ tableFilterString, conn, true);
+ break;
+ default:
+ LOGGER.trace(
+ "A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server "
+ + "and will be used by the plugin",
+ publicationName,
+ plugin,
+ database());
+ }
+ }
+ }
+ });
+ conn.commit();
+ conn.setAutoCommit(true);
+ } catch (SQLException e) {
+ throw new JdbcConnectionException(e);
+ }
+ }
+ }
+
+ private void createOrUpdatePublicationModeFilterted(
+ String tableFilterString, PostgresConnection conn, boolean isUpdate) {
+ String createOrUpdatePublicationStmt;
+ try {
+ Set tablesToCapture = determineCapturedTables();
+ tableFilterString =
+ tablesToCapture.stream()
+ .map(TableId::toDoubleQuotedString)
+ .collect(Collectors.joining(", "));
+ if (tableFilterString.isEmpty()) {
+ throw new DebeziumException(
+ String.format(
+ "No table filters found for filtered publication %s",
+ publicationName));
+ }
+ createOrUpdatePublicationStmt =
+ isUpdate
+ ? String.format(
+ "ALTER PUBLICATION %s SET TABLE %s;",
+ publicationName, tableFilterString)
+ : String.format(
+ "CREATE PUBLICATION %s FOR TABLE %s;",
+ publicationName, tableFilterString);
+ LOGGER.info(
+ isUpdate
+ ? "Updating Publication with statement '{}'"
+ : "Creating Publication with statement '{}'",
+ createOrUpdatePublicationStmt);
+ conn.execute(createOrUpdatePublicationStmt);
+ } catch (Exception e) {
+ throw new ConnectException(
+ String.format(
+ "Unable to %s filtered publication %s for %s",
+ isUpdate ? "update" : "create", publicationName, tableFilterString),
+ e);
+ }
+ }
+
+ private Set determineCapturedTables() throws Exception {
+ Set allTableIds = jdbcConnection.getAllTableIds(connectorConfig.databaseName());
+
+ Set capturedTables = new HashSet<>();
+
+ for (TableId tableId : allTableIds) {
+ if (tableFilter.dataCollectionFilter().isIncluded(tableId)) {
+ LOGGER.trace("Adding table {} to the list of captured tables", tableId);
+ capturedTables.add(tableId);
+ } else {
+ LOGGER.trace(
+ "Ignoring table {} as it's not included in the filter configuration",
+ tableId);
+ }
+ }
+
+ return capturedTables.stream()
+ .sorted()
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ protected void initReplicationSlot() throws SQLException, InterruptedException {
+ ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
+
+ boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
+ try {
+ // there's no info for this plugin and slot so create a new slot
+ if (shouldCreateSlot) {
+ this.createReplicationSlot();
+ }
+
+ // replication connection does not support parsing of SQL statements so we need to
+ // create
+ // the connection without executing on connect statements - see JDBC opt
+ // preferQueryMode=simple
+ pgConnection();
+ final String identifySystemStatement = "IDENTIFY_SYSTEM";
+ LOGGER.debug(
+ "running '{}' to validate replication connection", identifySystemStatement);
+ final Lsn xlogStart =
+ queryAndMap(
+ identifySystemStatement,
+ rs -> {
+ if (!rs.next()) {
+ throw new IllegalStateException(
+ "The DB connection is not a valid replication connection");
+ }
+ String xlogpos = rs.getString("xlogpos");
+ LOGGER.debug("received latest xlogpos '{}'", xlogpos);
+ return Lsn.valueOf(xlogpos);
+ });
+
+ if (slotCreationInfo != null) {
+ this.defaultStartingPos = slotCreationInfo.startLsn();
+ } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
+ // this is a new slot or we weren't able to read a valid flush LSN pos, so we always
+ // start from the xlog pos that was reported
+ this.defaultStartingPos = xlogStart;
+ } else {
+ Lsn latestFlushedLsn = slotInfo.latestFlushedLsn();
+ this.defaultStartingPos =
+ latestFlushedLsn.compareTo(xlogStart) < 0 ? latestFlushedLsn : xlogStart;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("found previous flushed LSN '{}'", latestFlushedLsn);
+ }
+ }
+ hasInitedSlot = true;
+ } catch (SQLException e) {
+ throw new JdbcConnectionException(e);
+ }
+ }
+
+ // Temporary replication slots is a new feature of PostgreSQL 10
+ private boolean useTemporarySlot() throws SQLException {
+ // Temporary replication slots cannot be used due to connection restart
+ // when finding WAL position
+ // return dropSlotOnClose && pgConnection().haveMinimumServerVersion(ServerVersion.v10);
+ return false;
+ }
+
+ /**
+ * creating a replication connection and starting to stream involves a few steps: 1. we create
+ * the connection and ensure that a. the slot exists b. the slot isn't currently being used 2.
+ * we query to get our potential start position in the slot (lsn) 3. we try and start streaming,
+ * depending on our options (such as in wal2json) this may fail, which can result in the
+ * connection being killed and we need to start the process over if we are using a temporary
+ * slot 4. actually start the streamer
+ *
+ *
This method takes care of all of these and this method queries for a default starting
+ * position If you know where you are starting from you should call {@link #startStreaming(Lsn,
+ * WalPositionLocator)}, this method delegates to that method
+ *
+ * @return
+ * @throws SQLException
+ * @throws InterruptedException
+ */
+ @Override
+ public ReplicationStream startStreaming(WalPositionLocator walPosition)
+ throws SQLException, InterruptedException {
+ return startStreaming(null, walPosition);
+ }
+
+ @Override
+ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPosition)
+ throws SQLException, InterruptedException {
+ initConnection();
+
+ connect();
+ if (offset == null || !offset.isValid()) {
+ offset = defaultStartingPos;
+ }
+ Lsn lsn = offset;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("starting streaming from LSN '{}'", lsn);
+ }
+
+ final int maxRetries = connectorConfig.maxRetries();
+ final Duration delay = connectorConfig.retryDelay();
+ int tryCount = 0;
+ while (true) {
+ try {
+ return createReplicationStream(lsn, walPosition);
+ } catch (Exception e) {
+ String message = "Failed to start replication stream at " + lsn;
+ if (++tryCount > maxRetries) {
+ if (e.getMessage().matches(".*replication slot .* is active.*")) {
+ message +=
+ "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
+ }
+ throw new DebeziumException(message, e);
+ } else {
+ LOGGER.warn(
+ message + ", waiting for {} ms and retrying, attempt number {} over {}",
+ delay,
+ tryCount,
+ maxRetries);
+ final Metronome metronome = Metronome.sleeper(delay, Clock.SYSTEM);
+ metronome.pause();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void initConnection() throws SQLException, InterruptedException {
+ // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
+ // For pgoutput specifically, the publication must be created before the slot.
+ initPublication();
+ if (!hasInitedSlot) {
+ initReplicationSlot();
+ }
+ }
+
+ @Override
+ public Optional createReplicationSlot() throws SQLException {
+ // note that some of these options are only supported in Postgres 9.4+, additionally
+ // the options are not yet exported by the jdbc api wrapper, therefore, we just do
+ // this ourselves but eventually this should be moved back to the jdbc API
+ // see https://github.com/pgjdbc/pgjdbc/issues/1305
+ ServerInfo.ReplicationSlot slotInfo;
+ try {
+ slotInfo = getSlotInfo();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
+
+ if (shouldCreateSlot) {
+ LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin);
+ String tempPart = "";
+ // Exported snapshots are supported in Postgres 9.4+
+ boolean canExportSnapshot = pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
+ if ((dropSlotOnClose) && !canExportSnapshot) {
+ LOGGER.warn(
+ "A slot marked as temporary or with an exported snapshot was created, "
+ + "but not on a supported version of Postgres, ignoring!");
+ }
+ if (useTemporarySlot()) {
+ tempPart = "TEMPORARY";
+ }
+
+ // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
+ // For pgoutput specifically, the publication must be created prior to the slot.
+ initPublication();
+
+ try (Statement stmt = pgConnection().createStatement()) {
+ String createCommand =
+ String.format(
+ "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s",
+ slotName, tempPart, plugin.getPostgresPluginName());
+ LOGGER.info("Creating replication slot with command {}", createCommand);
+ stmt.execute(createCommand);
+ // when we are in Postgres 9.4+, we can parse the slot creation info,
+ // otherwise, it returns nothing
+ if (canExportSnapshot) {
+ this.slotCreationInfo = parseSlotCreation(stmt.getResultSet());
+ }
+ }
+ }
+ return Optional.ofNullable(slotCreationInfo);
+ }
+
+ protected BaseConnection pgConnection() throws SQLException {
+ return (BaseConnection) connection(false);
+ }
+
+ private SlotCreationResult parseSlotCreation(ResultSet rs) {
+ try {
+ if (rs.next()) {
+ String slotName = rs.getString("slot_name");
+ String startPoint = rs.getString("consistent_point");
+ String snapName = rs.getString("snapshot_name");
+ String pluginName = rs.getString("output_plugin");
+
+ return new SlotCreationResult(slotName, startPoint, snapName, pluginName);
+ } else {
+ throw new ConnectException("No replication slot found");
+ }
+ } catch (SQLException ex) {
+ throw new ConnectException("Unable to parse create_replication_slot response", ex);
+ }
+ }
+
+ private ReplicationStream createReplicationStream(
+ final Lsn startLsn, WalPositionLocator walPosition)
+ throws SQLException, InterruptedException {
+ PGReplicationStream s;
+
+ try {
+ try {
+ s =
+ startPgReplicationStream(
+ startLsn,
+ plugin.forceRds()
+ ? messageDecoder::optionsWithoutMetadata
+ : messageDecoder::optionsWithMetadata);
+ messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
+ } catch (PSQLException e) {
+ LOGGER.debug(
+ "Could not register for streaming, retrying without optional options", e);
+
+ // re-init the slot after a failed start of slot, as this
+ // may have closed the slot
+ if (useTemporarySlot()) {
+ initReplicationSlot();
+ }
+
+ s =
+ startPgReplicationStream(
+ startLsn,
+ plugin.forceRds()
+ ? messageDecoder::optionsWithoutMetadata
+ : messageDecoder::optionsWithMetadata);
+ messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
+ }
+ } catch (PSQLException e) {
+ if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
+ // It is possible we are connecting to an old wal2json plug-in
+ LOGGER.warn(
+ "Could not register for streaming with metadata in messages, falling back to messages without metadata");
+
+ // re-init the slot after a failed start of slot, as this
+ // may have closed the slot
+ if (useTemporarySlot()) {
+ initReplicationSlot();
+ }
+
+ s = startPgReplicationStream(startLsn, messageDecoder::optionsWithoutMetadata);
+ messageDecoder.setContainsMetadata(false);
+ } else if (e.getMessage()
+ .matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
+ LOGGER.error("Cannot rewind to last processed WAL position", e);
+ throw new ConnectException(
+ "The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
+ } else {
+ throw e;
+ }
+ }
+
+ final PGReplicationStream stream = s;
+
+ return new ReplicationStream() {
+
+ private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
+ private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
+ private ExecutorService keepAliveExecutor = null;
+ private AtomicBoolean keepAliveRunning;
+ private final Metronome metronome =
+ Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);
+
+ // make sure this is volatile since multiple threads may be interested in this value
+ private volatile Lsn lastReceivedLsn;
+
+ @Override
+ public void read(ReplicationMessageProcessor processor)
+ throws SQLException, InterruptedException {
+ processWarnings(false);
+ ByteBuffer read = stream.read();
+ final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace(
+ "Streaming requested from LSN {}, received LSN {}",
+ startLsn,
+ lastReceiveLsn);
+ if (messageDecoder.shouldMessageBeSkipped(
+ read, lastReceiveLsn, startLsn, walPosition)) {
+ return;
+ }
+ deserializeMessages(read, processor);
+ }
+
+ @Override
+ public boolean readPending(ReplicationMessageProcessor processor)
+ throws SQLException, InterruptedException {
+ processWarnings(false);
+ ByteBuffer read = stream.readPending();
+ final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace(
+ "Streaming requested from LSN {}, received LSN {}",
+ startLsn,
+ lastReceiveLsn);
+
+ if (read == null) {
+ return false;
+ }
+
+ if (messageDecoder.shouldMessageBeSkipped(
+ read, lastReceiveLsn, startLsn, walPosition)) {
+ return true;
+ }
+
+ deserializeMessages(read, processor);
+
+ return true;
+ }
+
+ private void deserializeMessages(
+ ByteBuffer buffer, ReplicationMessageProcessor processor)
+ throws SQLException, InterruptedException {
+ lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
+ messageDecoder.processMessage(buffer, processor, typeRegistry);
+ }
+
+ @Override
+ public void close() throws SQLException {
+ processWarnings(true);
+ stream.close();
+ }
+
+ @Override
+ public void flushLsn(Lsn lsn) throws SQLException {
+ doFlushLsn(lsn);
+ }
+
+ private void doFlushLsn(Lsn lsn) throws SQLException {
+ stream.setFlushedLSN(lsn.asLogSequenceNumber());
+ stream.setAppliedLSN(lsn.asLogSequenceNumber());
+
+ stream.forceUpdateStatus();
+ }
+
+ @Override
+ public Lsn lastReceivedLsn() {
+ return lastReceivedLsn;
+ }
+
+ @Override
+ public void startKeepAlive(ExecutorService service) {
+ if (keepAliveExecutor == null) {
+ keepAliveExecutor = service;
+ keepAliveRunning = new AtomicBoolean(true);
+ keepAliveExecutor.submit(
+ () -> {
+ while (keepAliveRunning.get()) {
+ try {
+ LOGGER.trace(
+ "Forcing status update with replication stream");
+ stream.forceUpdateStatus();
+ metronome.pause();
+ } catch (Exception exp) {
+ throw new RuntimeException(
+ "received unexpected exception will perform keep alive",
+ exp);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void stopKeepAlive() {
+ if (keepAliveExecutor != null) {
+ keepAliveRunning.set(false);
+ keepAliveExecutor.shutdownNow();
+ keepAliveExecutor = null;
+ }
+ }
+
+ private void processWarnings(final boolean forced) throws SQLException {
+ if (--warningCheckCounter == 0 || forced) {
+ warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
+ for (SQLWarning w = connection().getWarnings();
+ w != null;
+ w = w.getNextWarning()) {
+ LOGGER.debug(
+ "Server-side message: '{}', state = {}, code = {}",
+ w.getMessage(),
+ w.getSQLState(),
+ w.getErrorCode());
+ }
+ connection().clearWarnings();
+ }
+ }
+
+ @Override
+ public Lsn startLsn() {
+ return startLsn;
+ }
+ };
+ }
+
+ private PGReplicationStream startPgReplicationStream(
+ final Lsn lsn,
+ BiFunction<
+ ChainedLogicalStreamBuilder,
+ Function,
+ ChainedLogicalStreamBuilder>
+ configurator)
+ throws SQLException {
+ assert lsn != null;
+ ChainedLogicalStreamBuilder streamBuilder =
+ pgConnection()
+ .getReplicationAPI()
+ .replicationStream()
+ .logical()
+ .withSlotName("\"" + slotName + "\"")
+ .withStartPosition(lsn.asLogSequenceNumber())
+ .withSlotOptions(streamParams);
+ streamBuilder = configurator.apply(streamBuilder, this::hasMinimumVersion);
+
+ if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 0) {
+ streamBuilder.withStatusInterval(
+ toIntExact(statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
+ }
+
+ PGReplicationStream stream = streamBuilder.start();
+
+ // TODO DBZ-508 get rid of this
+ // Needed by tests when connections are opened and closed in a fast sequence
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ }
+ stream.forceUpdateStatus();
+ return stream;
+ }
+
+ private Boolean hasMinimumVersion(int version) {
+ try {
+ return pgConnection().haveMinimumServerVersion(version);
+ } catch (SQLException e) {
+ throw new DebeziumException(e);
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ close(true);
+ }
+
+ public synchronized void close(boolean dropSlot) {
+ try {
+ LOGGER.debug("Closing message decoder");
+ messageDecoder.close();
+ } catch (Throwable e) {
+ LOGGER.error("Unexpected error while closing message decoder", e);
+ }
+
+ try {
+ LOGGER.debug("Closing replication connection");
+ super.close();
+ } catch (Throwable e) {
+ LOGGER.error("Unexpected error while closing Postgres connection", e);
+ }
+ if (dropSlotOnClose && dropSlot) {
+ // we're dropping the replication slot via a regular - i.e. not a replication -
+ // connection
+ try (PostgresConnection connection =
+ new PostgresConnection(
+ connectorConfig.getJdbcConfig(),
+ PostgresConnection.CONNECTION_DROP_SLOT)) {
+ connection.dropReplicationSlot(slotName);
+ connection.dropPublication(publicationName);
+ } catch (Throwable e) {
+ LOGGER.error("Unexpected error while dropping replication slot", e);
+ }
+ }
+ }
+
+ @Override
+ public void reconnect() throws SQLException {
+ close(false);
+ // Don't re-execute initial commands on reconnection
+ connection(false);
+ }
+
+ protected static class ReplicationConnectionBuilder implements Builder {
+
+ private final PostgresConnectorConfig config;
+ private String slotName = DEFAULT_SLOT_NAME;
+ private String publicationName = DEFAULT_PUBLICATION_NAME;
+ private RelationalTableFilters tableFilter;
+ private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode =
+ PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
+ private PostgresConnectorConfig.LogicalDecoder plugin =
+ PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
+ private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
+ private Duration statusUpdateIntervalVal;
+ private boolean doSnapshot;
+ private TypeRegistry typeRegistry;
+ private PostgresSchema schema;
+ private Properties slotStreamParams = new Properties();
+ private PostgresConnection jdbcConnection;
+
+ protected ReplicationConnectionBuilder(PostgresConnectorConfig config) {
+ assert config != null;
+ this.config = config;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder withSlot(final String slotName) {
+ assert slotName != null;
+ this.slotName = slotName;
+ return this;
+ }
+
+ @Override
+ public Builder withPublication(String publicationName) {
+ assert publicationName != null;
+ this.publicationName = publicationName;
+ return this;
+ }
+
+ @Override
+ public Builder withTableFilter(RelationalTableFilters tableFilter) {
+ assert tableFilter != null;
+ this.tableFilter = tableFilter;
+ return this;
+ }
+
+ @Override
+ public Builder withPublicationAutocreateMode(
+ PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode) {
+ assert publicationName != null;
+ this.publicationAutocreateMode = publicationAutocreateMode;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder withPlugin(
+ final PostgresConnectorConfig.LogicalDecoder plugin) {
+ assert plugin != null;
+ this.plugin = plugin;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder dropSlotOnClose(final boolean dropSlotOnClose) {
+ this.dropSlotOnClose = dropSlotOnClose;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder streamParams(final String slotStreamParams) {
+ if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
+ this.slotStreamParams = new Properties();
+ String[] paramsWithValues = slotStreamParams.split(";");
+ for (String paramsWithValue : paramsWithValues) {
+ String[] paramAndValue = paramsWithValue.split("=");
+ if (paramAndValue.length == 2) {
+ this.slotStreamParams.setProperty(paramAndValue[0], paramAndValue[1]);
+ } else {
+ LOGGER.warn(
+ "The following STREAM_PARAMS value is invalid: {}",
+ paramsWithValue);
+ }
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder statusUpdateInterval(
+ final Duration statusUpdateInterval) {
+ this.statusUpdateIntervalVal = statusUpdateInterval;
+ return this;
+ }
+
+ @Override
+ public Builder doSnapshot(boolean doSnapshot) {
+ this.doSnapshot = doSnapshot;
+ return this;
+ }
+
+ @Override
+ public Builder jdbcMetadataConnection(PostgresConnection jdbcConnection) {
+ this.jdbcConnection = jdbcConnection;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnection build() {
+ assert plugin != null : "Decoding plugin name is not set";
+ return new PostgresReplicationConnection(
+ config,
+ slotName,
+ publicationName,
+ tableFilter,
+ publicationAutocreateMode,
+ plugin,
+ dropSlotOnClose,
+ doSnapshot,
+ statusUpdateIntervalVal,
+ jdbcConnection,
+ typeRegistry,
+ slotStreamParams,
+ schema);
+ }
+
+ @Override
+ public Builder withTypeRegistry(TypeRegistry typeRegistry) {
+ this.typeRegistry = typeRegistry;
+ return this;
+ }
+
+ @Override
+ public Builder withSchema(PostgresSchema schema) {
+ this.schema = schema;
+ return this;
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java
new file mode 100644
index 000000000000..e9f552db6c01
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/opengauss/OpengaussIncrementalSourceFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.opengauss;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
+import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
+import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.option.PostgresOptions;
+import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresIncrementalSource;
+import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+@AutoService(Factory.class)
+public class OpengaussIncrementalSourceFactory implements TableSourceFactory {
+ private static final String IDENTIFIER = "Opengauss-CDC";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return JdbcSourceOptions.getBaseRule()
+ .required(
+ JdbcSourceOptions.USERNAME,
+ JdbcSourceOptions.PASSWORD,
+ JdbcCatalogOptions.BASE_URL)
+ .exclusive(CatalogOptions.TABLE_NAMES, CatalogOptions.TABLE_PATTERN)
+ .optional(
+ JdbcSourceOptions.DATABASE_NAMES,
+ JdbcSourceOptions.SERVER_TIME_ZONE,
+ JdbcSourceOptions.CONNECT_TIMEOUT_MS,
+ JdbcSourceOptions.CONNECT_MAX_RETRIES,
+ JdbcSourceOptions.CONNECTION_POOL_SIZE,
+ PostgresOptions.DECODING_PLUGIN_NAME,
+ PostgresOptions.SLOT_NAME,
+ JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
+ JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
+ JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
+ JdbcSourceOptions.TABLE_NAMES_CONFIG)
+ .optional(PostgresSourceOptions.STARTUP_MODE, PostgresSourceOptions.STOP_MODE)
+ .conditional(
+ PostgresSourceOptions.STARTUP_MODE,
+ StartupMode.INITIAL,
+ JdbcSourceOptions.EXACTLY_ONCE)
+ .build();
+ }
+
+ @Override
+ public Class extends SeaTunnelSource> getSourceClass() {
+ return PostgresIncrementalSource.class;
+ }
+
+ @Override
+ public
+ TableSource createSource(TableSourceFactoryContext context) {
+ return () -> {
+ List catalogTables =
+ CatalogTableUtil.getCatalogTables(
+ "Postgres", context.getOptions(), context.getClassLoader());
+ Optional> tableConfigs =
+ context.getOptions().getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG);
+ if (tableConfigs.isPresent()) {
+ catalogTables =
+ CatalogTableUtils.mergeCatalogTableConfig(
+ catalogTables, tableConfigs.get(), s -> TablePath.of(s, true));
+ }
+ SeaTunnelDataType dataType =
+ CatalogTableUtil.convertToMultipleRowType(catalogTables);
+ return (SeaTunnelSource)
+ new PostgresIncrementalSource<>(context.getOptions(), dataType, catalogTables);
+ };
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/pom.xml b/seatunnel-connectors-v2/connector-cdc/pom.xml
index 44916d35caa2..a422f6406d3e 100644
--- a/seatunnel-connectors-v2/connector-cdc/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/pom.xml
@@ -36,6 +36,7 @@
connector-cdc-mongodbconnector-cdc-postgresconnector-cdc-oracle
+ connector-cdc-opengauss
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index fb6935f08940..a633b64a1dd1 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -518,6 +518,12 @@
${project.version}provided
+
+ org.apache.seatunnel
+ connector-cdc-opengauss
+ ${project.version}
+ provided
+ org.apache.seatunnelconnector-tdengine
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
new file mode 100644
index 000000000000..f95e5cdb1a00
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
@@ -0,0 +1,75 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-connector-v2-e2e
+ ${revision}
+
+
+ connector-cdc-opengauss-e2e
+ SeaTunnel : E2E : Connector V2 : CDC Opengauss
+
+
+
+
+ org.apache.seatunnel
+ connector-jdbc
+ ${project.version}
+ pom
+ import
+
+
+
+
+
+
+
+ org.apache.seatunnel
+ connector-cdc-opengauss
+ ${project.version}
+ test
+
+
+
+ org.apache.seatunnel
+ connector-jdbc
+ ${project.version}
+ test
+
+
+
+ org.testcontainers
+ postgresql
+ ${testcontainer.version}
+ test
+
+
+
+
+ org.postgresql
+ postgresql
+ 42.5.1
+
+
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
new file mode 100644
index 000000000000..dc80a083a766
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
@@ -0,0 +1,727 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.postgres;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Awaitility.given;
+import static org.junit.Assert.assertNotNull;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
+public class OpengaussCDCIT extends TestSuiteBase implements TestResource {
+ private static final int OPENGAUSS_PORT = 5432;
+ private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
+ private static final String USERNAME = "gaussdb";
+ private static final String PASSWORD = "openGauss@123";
+ private static final String OPENGAUSSQL_DATABASE = "opengauss_cdc";
+ private static final String OPENGAUSSQL_DEFAULT_DATABASE = "postgres";
+ private static final String OPENGAUSS_SCHEMA = "inventory";
+
+ private static final String SOURCE_TABLE_1 = "opengauss_cdc_table_1";
+ private static final String SOURCE_TABLE_2 = "opengauss_cdc_table_2";
+ private static final String SOURCE_TABLE_3 = "opengauss_cdc_table_3";
+ private static final String SINK_TABLE_1 = "sink_opengauss_cdc_table_1";
+ private static final String SINK_TABLE_2 = "sink_opengauss_cdc_table_2";
+ private static final String SINK_TABLE_3 = "sink_opengauss_cdc_table_3";
+
+ private static final String SOURCE_TABLE_NO_PRIMARY_KEY = "full_types_no_primary_key";
+
+ private static final String OPENGAUSS_HOST = "opengauss_cdc_e2e";
+
+ protected static final DockerImageName OPENGAUSS_IMAGE =
+ DockerImageName.parse("opengauss/opengauss:5.0.0")
+ .asCompatibleSubstituteFor("postgres");
+
+ private static final String SOURCE_SQL_TEMPLATE = "select * from %s.%s order by id";
+
+ public static final GenericContainer> OPENGAUSS_CONTAINER =
+ new GenericContainer<>(OPENGAUSS_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(OPENGAUSS_HOST)
+ .withEnv("GS_PASSWORD", PASSWORD)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+
+ private String driverUrl() {
+ return "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.1/postgresql-42.5.1.jar";
+ }
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/JDBC/lib && cd /tmp/seatunnel/plugins/JDBC/lib && wget "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ log.info("The second stage: Starting opengauss containers...");
+ OPENGAUSS_CONTAINER.setPortBindings(
+ Lists.newArrayList(String.format("%s:%s", OPENGAUSS_PORT, OPENGAUSS_PORT)));
+ Startables.deepStart(Stream.of(OPENGAUSS_CONTAINER)).join();
+ log.info("Opengauss Containers are started");
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(2, TimeUnit.SECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(this::initializeOpengaussSql);
+
+ String[] command1 = {
+ "/bin/sh",
+ "-c",
+ "sed -i 's/^#password_encryption_type = 2/password_encryption_type = 1/' /var/lib/opengauss/data/postgresql.conf"
+ };
+ Container.ExecResult result1 = OPENGAUSS_CONTAINER.execInContainer(command1);
+ Assertions.assertEquals(0, result1.getExitCode());
+
+ String[] command2 = {
+ "/bin/sh",
+ "-c",
+ "sed -i 's/host replication gaussdb 0.0.0.0\\/0 md5/host replication gaussdb 0.0.0.0\\/0 sha256/' /var/lib/opengauss/data/pg_hba.conf"
+ };
+ Container.ExecResult result2 = OPENGAUSS_CONTAINER.execInContainer(command2);
+ Assertions.assertEquals(0, result2.getExitCode());
+ String[] command3 = {
+ "/bin/sh",
+ "-c",
+ "echo \"host all dailai 0.0.0.0/0 md5\" >> /var/lib/opengauss/data/pg_hba.conf"
+ };
+ Container.ExecResult result3 = OPENGAUSS_CONTAINER.execInContainer(command3);
+ Assertions.assertEquals(0, result3.getExitCode());
+
+ reloadConf();
+
+ createNewUserForJdbcSink();
+ }
+
+ @TestTemplate
+ public void testOpengaussCdcCheckDataE2e(TestContainer container) {
+ try {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob("/opengausscdc_to_opengauss.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(getQuerySQL(OPENGAUSS_SCHEMA, SOURCE_TABLE_1)),
+ query(getQuerySQL(OPENGAUSS_SCHEMA, SINK_TABLE_1)));
+ });
+
+ // insert update delete
+ upsertDeleteSourceTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(getQuerySQL(OPENGAUSS_SCHEMA, SOURCE_TABLE_1)),
+ query(getQuerySQL(OPENGAUSS_SCHEMA, SINK_TABLE_1)));
+ });
+ } finally {
+ // Clear related content to ensure that multiple operations are not affected
+ clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1);
+ clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_1);
+ }
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support multi table")
+ public void testOpengaussCdcMultiTableE2e(TestContainer container) {
+ try {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/opengausscdc_to_opengauss_with_multi_table_mode_two_table.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_1)),
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SINK_TABLE_1))),
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_2)),
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SINK_TABLE_2)))));
+
+ // insert update delete
+ upsertDeleteSourceTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1);
+ upsertDeleteSourceTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_2);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_1)),
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SINK_TABLE_1))),
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_2)),
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SINK_TABLE_2)))));
+ } finally {
+ // Clear related content to ensure that multiple operations are not affected
+ clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1);
+ clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_1);
+ clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_2);
+ clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_2);
+ }
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support multi table")
+ public void testMultiTableWithRestore(TestContainer container)
+ throws IOException, InterruptedException {
+ try {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob(
+ "/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ // insert update delete
+ upsertDeleteSourceTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_1)),
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SINK_TABLE_1)))));
+
+ Pattern jobIdPattern =
+ Pattern.compile(
+ ".*Init JobMaster for Job opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
+ Pattern.DOTALL);
+ Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
+ String jobId;
+ if (matcher.matches()) {
+ jobId = matcher.group(1);
+ } else {
+ throw new RuntimeException("Can not find jobId");
+ }
+
+ Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
+
+ // Restore job with add a new table
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.restoreJob(
+ "/opengausscdc_to_opengauss_with_multi_table_mode_two_table.conf",
+ jobId);
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ upsertDeleteSourceTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_2);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_1)),
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SINK_TABLE_1))),
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_2)),
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SINK_TABLE_2)))));
+
+ log.info("****************** container logs start ******************");
+ String containerLogs = container.getServerLogs();
+ log.info(containerLogs);
+ // pg cdc logs contain ERROR
+ // Assertions.assertFalse(containerLogs.contains("ERROR"));
+ log.info("****************** container logs end ******************");
+ } finally {
+ // Clear related content to ensure that multiple operations are not affected
+ clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1);
+ clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_1);
+ clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_2);
+ clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_2);
+ }
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support multi table")
+ public void testAddFiledWithRestore(TestContainer container)
+ throws IOException, InterruptedException {
+ try {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob(
+ "/opengausscdc_to_opengauss_test_add_Filed.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_3)),
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SINK_TABLE_3)))));
+
+ Pattern jobIdPattern =
+ Pattern.compile(
+ ".*Init JobMaster for Job opengausscdc_to_opengauss_test_add_Filed.conf \\(([0-9]*)\\).*",
+ Pattern.DOTALL);
+ Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
+ String jobId;
+ if (matcher.matches()) {
+ jobId = matcher.group(1);
+ } else {
+ throw new RuntimeException("Can not find jobId");
+ }
+
+ Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
+
+ // add filed add insert source table data
+ addFieldsForTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_3);
+ addFieldsForTable(OPENGAUSS_SCHEMA, SINK_TABLE_3);
+ insertSourceTableForAddFields(OPENGAUSS_SCHEMA, SOURCE_TABLE_3);
+
+ // Restore job
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.restoreJob(
+ "/opengausscdc_to_opengauss_test_add_Filed.conf", jobId);
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_3)),
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SINK_TABLE_3)))));
+ } finally {
+ // Clear related content to ensure that multiple operations are not affected
+ clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_3);
+ clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_3);
+ }
+ }
+
+ @TestTemplate
+ public void testOpengaussCdcCheckDataWithNoPrimaryKey(TestContainer container)
+ throws Exception {
+
+ try {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/opengausscdc_to_opengauss_with_no_primary_key.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // snapshot stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_NO_PRIMARY_KEY)),
+ query(getQuerySQL(OPENGAUSS_SCHEMA, SINK_TABLE_1)));
+ });
+
+ // insert update delete
+ upsertDeleteSourceTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_NO_PRIMARY_KEY)),
+ query(getQuerySQL(OPENGAUSS_SCHEMA, SINK_TABLE_1)));
+ });
+ } finally {
+ clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_NO_PRIMARY_KEY);
+ clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_1);
+ }
+ }
+
+ @TestTemplate
+ public void testOpengaussCdcCheckDataWithCustomPrimaryKey(TestContainer container)
+ throws Exception {
+
+ try {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/opengausscdc_to_opengauss_with_custom_primary_key.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // snapshot stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_NO_PRIMARY_KEY)),
+ query(getQuerySQL(OPENGAUSS_SCHEMA, SINK_TABLE_1)));
+ });
+
+ // insert update delete
+ upsertDeleteSourceTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_NO_PRIMARY_KEY);
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(
+ getQuerySQL(
+ OPENGAUSS_SCHEMA,
+ SOURCE_TABLE_NO_PRIMARY_KEY)),
+ query(getQuerySQL(OPENGAUSS_SCHEMA, SINK_TABLE_1)));
+ });
+ } finally {
+ clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_NO_PRIMARY_KEY);
+ clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_1);
+ }
+ }
+
+ private void addFieldsForTable(String database, String tableName) {
+ executeSql("ALTER TABLE " + database + "." + tableName + " ADD COLUMN f_big BIGINT");
+ }
+
+ private void insertSourceTableForAddFields(String database, String tableName) {
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " VALUES (2, '2', 32767, 65535, 2147483647);");
+ }
+
+ private void clearTable(String database, String tableName) {
+ executeSql("truncate table " + database + "." + tableName);
+ }
+
+ private void upsertDeleteSourceTable(String database, String tableName) {
+
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " VALUES (2, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,\n"
+ + " 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',\n"
+ + " '2020-07-17', '18:00:22', 500);");
+
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " VALUES (3, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,\n"
+ + " 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',\n"
+ + " '2020-07-17', '18:00:22', 500);");
+
+ executeSql("DELETE FROM " + database + "." + tableName + " where id = 2;");
+
+ executeSql("UPDATE " + database + "." + tableName + " SET f_big = 10000 where id = 3;");
+ }
+
+ private void executeSql(String sql) {
+ try (Connection connection = getJdbcConnection(OPENGAUSSQL_DATABASE);
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET search_path TO inventory;");
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getQuerySQL(String database, String tableName) {
+ return String.format(SOURCE_SQL_TEMPLATE, database, tableName);
+ }
+
+ private List> query(String sql) {
+ try (Connection connection = getJdbcConnection(OPENGAUSSQL_DATABASE)) {
+ ResultSet resultSet = connection.createStatement().executeQuery(sql);
+ List> result = new ArrayList<>();
+ int columnCount = resultSet.getMetaData().getColumnCount();
+ while (resultSet.next()) {
+ ArrayList