diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java index 3f4c386db24..1cc61bd7734 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java @@ -66,5 +66,7 @@ interface Context { void rewriteOutputBuffer(Map outputBuffer, SourceRecord changeRecord); List formatMessageTimestamp(Collection snapshotRecords); + + void close(); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java index 11e265823c0..479a717b551 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -192,6 +192,9 @@ private void checkReadException() { @Override public void close() { try { + if (taskContext != null) { + taskContext.close(); + } if (snapshotSplitReadTask != null) { snapshotSplitReadTask.shutdown(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index 5d9d4177c13..40a67ecfcad 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -121,6 +121,9 @@ private void checkReadException() { @Override public void close() { try { + if (taskContext != null) { + taskContext.close(); + } if (streamFetchTask != null) { streamFetchTask.shutdown(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java index 1352a78e0b1..15d3b6bf7ff 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java @@ -33,8 +33,6 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils; -import com.github.shyiko.mysql.binlog.BinaryLogClient; -import io.debezium.connector.mysql.MySqlConnection; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; @@ -42,8 +40,6 @@ import java.sql.SQLException; import java.util.List; -import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient; -import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection; import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive; /** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */ @@ -104,12 +100,7 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta @Override public MySqlSourceFetchTaskContext createFetchTaskContext( SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) { - final MySqlConnection jdbcConnection = - createMySqlConnection(taskSourceConfig.getDbzConfiguration()); - final BinaryLogClient binaryLogClient = - createBinaryClient(taskSourceConfig.getDbzConfiguration()); - return new MySqlSourceFetchTaskContext( - taskSourceConfig, this, jdbcConnection, binaryLogClient); + return new MySqlSourceFetchTaskContext(taskSourceConfig, this); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index b32f62f6b9d..c848c50dd98 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; @@ -62,15 +63,21 @@ import io.debezium.schema.DataCollectionId; import io.debezium.schema.TopicSelector; import io.debezium.util.Collect; +import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.sql.SQLException; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection; /** The context for fetch task that fetching data of snapshot split from MySQL data source. */ +@Slf4j public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext { private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class); @@ -89,13 +96,10 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext { private MySqlErrorHandler errorHandler; public MySqlSourceFetchTaskContext( - JdbcSourceConfig sourceConfig, - JdbcDataSourceDialect dataSourceDialect, - MySqlConnection connection, - BinaryLogClient binaryLogClient) { + JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) { super(sourceConfig, dataSourceDialect); - this.connection = connection; - this.binaryLogClient = binaryLogClient; + this.connection = createMySqlConnection(sourceConfig.getDbzConfiguration()); + this.binaryLogClient = createBinaryClient(sourceConfig.getDbzConfiguration()); this.metadataProvider = new MySqlEventMetadataProvider(); } @@ -159,6 +163,18 @@ public void configure(SourceSplitBase sourceSplitBase) { this.errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue); } + @Override + public void close() { + try { + this.connection.close(); + this.binaryLogClient.disconnect(); + } catch (SQLException e) { + log.warn("Failed to close connection, {}", ExceptionUtils.getMessage(e)); + } catch (IOException e) { + log.warn("Failed to close binaryLogClient, {}", ExceptionUtils.getMessage(e)); + } + } + @Override public MySqlSourceConfig getSourceConfig() { return (MySqlSourceConfig) sourceConfig; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java index 1f2fb09736d..b9ffe60349d 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java @@ -339,6 +339,7 @@ public static String quote(TableId tableId) { private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, int fetchSize) throws SQLException { final Connection connection = jdbc.connection(); + // Add MySQL metadata locks to prevent modification of table structure. connection.setAutoCommit(false); final PreparedStatement statement = connection.prepareStatement( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java index cdef6311950..0494cd98e1c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java @@ -34,7 +34,6 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerSchema; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.TableDiscoveryUtils; -import io.debezium.connector.sqlserver.SqlServerConnection; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; @@ -104,13 +103,8 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta @Override public SqlServerSourceFetchTaskContext createFetchTaskContext( SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) { - final SqlServerConnection jdbcConnection = - createSqlServerConnection(taskSourceConfig.getDbzConfiguration()); - final SqlServerConnection metaDataConnection = - createSqlServerConnection(taskSourceConfig.getDbzConfiguration()); - return new SqlServerSourceFetchTaskContext( - taskSourceConfig, this, jdbcConnection, metaDataConnection); + return new SqlServerSourceFetchTaskContext(taskSourceConfig, this); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java index 6dd8e7d401f..d480cb7a693 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; @@ -58,13 +59,18 @@ import io.debezium.schema.DataCollectionId; import io.debezium.schema.TopicSelector; import io.debezium.util.Collect; +import lombok.extern.slf4j.Slf4j; +import java.sql.SQLException; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; +import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection; + /** The context for fetch task that fetching data of snapshot split from MySQL data source. */ +@Slf4j public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext { private final SqlServerConnection dataConnection; @@ -83,13 +89,11 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics; public SqlServerSourceFetchTaskContext( - JdbcSourceConfig sourceConfig, - JdbcDataSourceDialect dataSourceDialect, - SqlServerConnection dataConnection, - SqlServerConnection metadataConnection) { + JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) { super(sourceConfig, dataSourceDialect); - this.dataConnection = dataConnection; - this.metadataConnection = metadataConnection; + + this.dataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration()); + this.metadataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration()); this.metadataProvider = new SqlServerEventMetadataProvider(); } @@ -158,6 +162,16 @@ public void configure(SourceSplitBase sourceSplitBase) { this.errorHandler = new SqlServerErrorHandler(connectorConfig.getLogicalName(), queue); } + @Override + public void close() { + try { + this.dataConnection.close(); + this.metadataConnection.close(); + } catch (SQLException e) { + log.warn("Failed to close connection, {}", ExceptionUtils.getMessage(e)); + } + } + @Override public SqlServerSourceConfig getSourceConfig() { return (SqlServerSourceConfig) sourceConfig;