Skip to content

Commit

Permalink
[postgres] close replication connection when close incremental Source…
Browse files Browse the repository at this point in the history
…SplitReader(apache#2425)
  • Loading branch information
loserwang1024 committed Aug 23, 2023
1 parent 54df7e9 commit 454928f
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ public PostgresConnection openJdbcConnection() {
return (PostgresConnection) openJdbcConnection(sourceConfig);
}

public PostgresReplicationConnection openPostgresReplicationConnection() {
public PostgresReplicationConnection openPostgresReplicationConnection(
PostgresConnection jdbcConnection) {
try {
PostgresConnection jdbcConnection =
(PostgresConnection) openJdbcConnection(sourceConfig);
PostgresConnectorConfig pgConnectorConfig = sourceConfig.getDbzConnectorConfig();
TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(pgConnectorConfig);
PostgresConnection.PostgresValueConverterBuilder valueConverterBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public abstract class PostgresTestBase extends AbstractTestBase {
// use newer version of postgresql image to support pgoutput plugin
// when testing postgres 13, only 13-alpine supports both amd64 and arm64
protected static final DockerImageName PG_IMAGE =
DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres");
DockerImageName.parse("debezium/postgres:10").asCompatibleSubstituteFor("postgres");

public static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
new PostgreSQLContainer<>(PG_IMAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.sql.SQLException;
import java.util.ArrayList;
Expand All @@ -62,6 +64,7 @@
import static org.junit.Assert.assertTrue;

/** IT tests for {@link PostgresSourceBuilder.PostgresIncrementalSource}. */
@RunWith(Parameterized.class)
public class PostgresSourceITCase extends PostgresTestBase {

private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
Expand All @@ -71,6 +74,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
private static final String DB_NAME_PREFIX = "postgres";
private static final String SCHEMA_NAME = "customer";

private final String scanStartupMode;

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);

@Rule
Expand Down Expand Up @@ -101,6 +106,15 @@ public class PostgresSourceITCase extends PostgresTestBase {
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]");

public PostgresSourceITCase(String scanStartupMode) {
this.scanStartupMode = scanStartupMode;
}

@Parameterized.Parameters(name = "scanStartupMode: {0}")
public static Object[] parameters() {
return new Object[][] {new Object[] {"initial"}, new Object[] {"latest-offset"}};
}

/** Second part stream events, which is made by {@link #makeSecondPartStreamEvents}. */
private final List<String> secondPartStreamEvents =
Arrays.asList(
Expand Down Expand Up @@ -178,23 +192,33 @@ public void testJobManagerFailoverSingleParallelism() throws Exception {
}

@Test
public void testConsumingTableWithoutPrimaryKey() {
try {
public void testConsumingTableWithoutPrimaryKey() throws Exception {
if(scanStartupMode == DEFAULT_SCAN_STARTUP_MODE) {
try {
testPostgresParallelSource(
1,
scanStartupMode,
FailoverType.NONE,
FailoverPhase.NEVER,
new String[]{"customers_no_pk"},
RestartStrategies.noRestart());
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
e,
String.format(
"Incremental snapshot for tables requires primary key, but table %s doesn't have primary key",
SCHEMA_NAME + ".customers_no_pk"))
.isPresent());
}
}else{
testPostgresParallelSource(
1,
DEFAULT_SCAN_STARTUP_MODE,
scanStartupMode,
FailoverType.NONE,
FailoverPhase.NEVER,
new String[] {"customers_no_pk"},
new String[]{"customers_no_pk"},
RestartStrategies.noRestart());
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
e,
String.format(
"Incremental snapshot for tables requires primary key, but table %s doesn't have primary key",
SCHEMA_NAME + ".customers_no_pk"))
.isPresent());
}
}

Expand All @@ -213,7 +237,7 @@ private void testPostgresParallelSource(
throws Exception {
testPostgresParallelSource(
parallelism,
DEFAULT_SCAN_STARTUP_MODE,
scanStartupMode,
failoverType,
failoverPhase,
captureCustomerTables,
Expand Down Expand Up @@ -255,6 +279,7 @@ private void testPostgresParallelSource(
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+" 'decoding.plugin.name' = 'pgoutput',"
+ " 'slot.name' = '%s'"
+ ")",
customDatabase.getHost(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
Expand Down Expand Up @@ -155,10 +154,6 @@ public void createAndInitialize() {
}
}

public Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(container.getJdbcUrl(), username, password);
}

private String convertSQL(final String sql) {
return sql.replace("$DBNAME$", schemaName);
}
Expand Down

0 comments on commit 454928f

Please sign in to comment.