From fd451f5298f78e33b5229a4bd60aa3a15e8cc2f2 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Wed, 23 Aug 2023 10:27:43 +0800 Subject: [PATCH 1/2] [postgres] close jdbc connection when close replication connection(#2425) --- .../cdc/connectors/postgres/source/PostgresDialect.java | 5 ++--- .../postgres/source/enumerator/PostgresSourceEnumerator.java | 4 ++-- .../cdc/connectors/postgres/testutils/UniqueDatabase.java | 5 ----- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java index f2f125a1e7e..3440bd125da 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java @@ -100,10 +100,9 @@ public PostgresConnection openJdbcConnection() { return (PostgresConnection) openJdbcConnection(sourceConfig); } - public PostgresReplicationConnection openPostgresReplicationConnection() { + public PostgresReplicationConnection openPostgresReplicationConnection( + PostgresConnection jdbcConnection) { try { - PostgresConnection jdbcConnection = - (PostgresConnection) openJdbcConnection(sourceConfig); PostgresConnectorConfig pgConnectorConfig = sourceConfig.getDbzConnectorConfig(); TopicSelector topicSelector = PostgresTopicSelector.create(pgConnectorConfig); PostgresConnection.PostgresValueConverterBuilder valueConverterBuilder = diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java index 0c7f8a00d5b..7ee072c427f 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java @@ -78,9 +78,9 @@ private void createSlotForGlobalStreamSplit() { return; } - try { + try (PostgresConnection connection = postgresDialect.openJdbcConnection()) { PostgresReplicationConnection replicationConnection = - postgresDialect.openPostgresReplicationConnection(); + postgresDialect.openPostgresReplicationConnection(connection); replicationConnection.createReplicationSlot(); replicationConnection.close(false); } catch (Throwable t) { diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java index 8064e7dcb22..9b83f05c809 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java @@ -23,7 +23,6 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; @@ -155,10 +154,6 @@ public void createAndInitialize() { } } - public Connection getJdbcConnection() throws SQLException { - return DriverManager.getConnection(container.getJdbcUrl(), username, password); - } - private String convertSQL(final String sql) { return sql.replace("$DBNAME$", schemaName); } From b8cf311a09800594bbe20e44604a92aec7cf7d5c Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 7 Sep 2023 19:14:26 +0800 Subject: [PATCH 2/2] [postgres] JdbcConnectionPools contains multiple JdbcConnectionPoolFactory(#2449) --- .../enumerator/PostgresSourceEnumerator.java | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java index 7ee072c427f..40164b9e462 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java @@ -28,8 +28,6 @@ import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; import io.debezium.connector.postgresql.spi.SlotState; -import java.sql.SQLException; - /** * The Postgres source enumerator that enumerates receive the split request and assign the split to * source readers. @@ -60,32 +58,25 @@ public void start() { * reading the globalStreamSplit to catch all data changes. */ private void createSlotForGlobalStreamSplit() { - SlotState slotInfo = null; try (PostgresConnection connection = postgresDialect.openJdbcConnection()) { - slotInfo = + SlotState slotInfo = connection.getReplicationSlotState( postgresDialect.getSlotName(), postgresDialect.getPluginName()); - } catch (SQLException e) { - throw new RuntimeException( - String.format( - "Fail to get the replication slot info, the slot name is %s.", - postgresDialect.getSlotName()), - e); - } - - // skip creating the replication slot when the slot exists. - if (slotInfo != null) { - return; - } - - try (PostgresConnection connection = postgresDialect.openJdbcConnection()) { + // skip creating the replication slot when the slot exists. + if (slotInfo != null) { + return; + } PostgresReplicationConnection replicationConnection = postgresDialect.openPostgresReplicationConnection(connection); replicationConnection.createReplicationSlot(); replicationConnection.close(false); + } catch (Throwable t) { throw new FlinkRuntimeException( - "Create Slot For Global Stream Split failed due to ", t); + String.format( + "Fail to get or create slot for global stream split, the slot name is %s. Due to: ", + postgresDialect.getSlotName()), + t); } } }