From daa9b4fa9d6e9e81127d87cc3ab5b0397424a6fc Mon Sep 17 00:00:00 2001 From: liuli Date: Fri, 10 Mar 2023 18:57:11 +0800 Subject: [PATCH 1/7] [bugfix][cdc-base] Fix cdc base shutdown thread not cleared --- .../cdc/base/source/reader/external/FetchTask.java | 3 +++ .../source/reader/external/IncrementalSourceScanFetcher.java | 3 +++ .../reader/external/IncrementalSourceStreamFetcher.java | 5 ++++- .../source/reader/fetch/binlog/MySqlBinlogFetchTask.java | 5 +++++ .../source/reader/fetch/scan/MySqlSnapshotFetchTask.java | 5 +++++ .../source/reader/fetch/scan/SqlServerSnapshotFetchTask.java | 5 +++++ .../transactionlog/SqlServerTransactionLogFetchTask.java | 5 +++++ 7 files changed, 30 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java index 7a8ba5fe342..3f4c386db24 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java @@ -41,6 +41,9 @@ public interface FetchTask { /** Returns current task is running or not. */ boolean isRunning(); + /** Close this task */ + void shutdown(); + /** Returns the split that the task used. */ Split getSplit(); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java index 25bf3542976..6f34513b503 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -192,6 +192,9 @@ private void checkReadException() { @Override public void close() { try { + if (snapshotSplitReadTask != null) { + snapshotSplitReadTask.shutdown(); + } if (executorService != null) { executorService.shutdown(); if (executorService.awaitTermination( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index f14eac745bb..65cd3db1fd2 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -121,9 +121,12 @@ private void checkReadException() { @Override public void close() { try { + if (streamFetchTask != null) { + streamFetchTask.shutdown(); + } if (executorService != null) { executorService.shutdown(); - if (executorService.awaitTermination( + if (!executorService.awaitTermination( READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.warn( "Failed to close the stream fetcher in {} seconds.", diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java index d59070a2f6b..f858864e78a 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java @@ -81,6 +81,11 @@ public boolean isRunning() { return taskRunning; } + @Override + public void shutdown() { + taskRunning = false; + } + @Override public SourceSplitBase getSplit() { return split; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java index 6c0bea13e58..ac997f7ed44 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java @@ -155,6 +155,11 @@ public boolean isRunning() { return taskRunning; } + @Override + public void shutdown() { + taskRunning = false; + } + @Override public SourceSplitBase getSplit() { return split; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java index a8246fa2434..82d43971e54 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java @@ -166,6 +166,11 @@ public boolean isRunning() { return taskRunning; } + @Override + public void shutdown() { + taskRunning = false; + } + @Override public SourceSplitBase getSplit() { return split; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java index 21a6af3a5c5..f6968b62134 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java @@ -78,6 +78,11 @@ public boolean isRunning() { return taskRunning; } + @Override + public void shutdown() { + taskRunning = false; + } + @Override public SourceSplitBase getSplit() { return split; From ef4384bffb78cba01a1a13f7dd969f41c1705c6e Mon Sep 17 00:00:00 2001 From: liuli Date: Mon, 13 Mar 2023 12:23:49 +0800 Subject: [PATCH 2/7] [bugfix][cdc-base] Fix cdc base shutdown thread not cleared --- .../source/reader/external/IncrementalSourceScanFetcher.java | 5 +++-- .../reader/external/IncrementalSourceStreamFetcher.java | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java index 6f34513b503..11e265823c0 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -197,11 +197,12 @@ public void close() { } if (executorService != null) { executorService.shutdown(); - if (executorService.awaitTermination( + if (!executorService.awaitTermination( READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.warn( - "Failed to close the scan fetcher in {} seconds.", + "Failed to close the scan fetcher in {} seconds. will execute force close(ExecutorService.shutdownNow)", READER_CLOSE_TIMEOUT_SECONDS); + executorService.shutdownNow(); } } } catch (Exception e) { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index 65cd3db1fd2..5d9d4177c13 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -129,8 +129,9 @@ public void close() { if (!executorService.awaitTermination( READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.warn( - "Failed to close the stream fetcher in {} seconds.", + "Failed to close the stream fetcher in {} seconds. will execute force close(ExecutorService.shutdownNow)", READER_CLOSE_TIMEOUT_SECONDS); + executorService.shutdownNow(); } } } catch (Exception e) { From fc918b960815fce270d259352f6211c2a7d191bc Mon Sep 17 00:00:00 2001 From: liuli Date: Mon, 13 Mar 2023 18:02:20 +0800 Subject: [PATCH 3/7] add close FetchTask.Context --- .../source/reader/external/FetchTask.java | 2 ++ .../IncrementalSourceScanFetcher.java | 3 ++ .../IncrementalSourceStreamFetcher.java | 3 ++ .../cdc/mysql/source/MySqlDialect.java | 11 +------- .../fetch/MySqlSourceFetchTaskContext.java | 28 +++++++++++++++---- .../seatunnel/cdc/mysql/utils/MySqlUtils.java | 1 + .../source/source/SqlServerDialect.java | 8 +----- .../SqlServerSourceFetchTaskContext.java | 26 +++++++++++++---- 8 files changed, 53 insertions(+), 29 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java index 3f4c386db24..1cc61bd7734 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java @@ -66,5 +66,7 @@ interface Context { void rewriteOutputBuffer(Map outputBuffer, SourceRecord changeRecord); List formatMessageTimestamp(Collection snapshotRecords); + + void close(); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java index 11e265823c0..479a717b551 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -192,6 +192,9 @@ private void checkReadException() { @Override public void close() { try { + if (taskContext != null) { + taskContext.close(); + } if (snapshotSplitReadTask != null) { snapshotSplitReadTask.shutdown(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index 5d9d4177c13..40a67ecfcad 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -121,6 +121,9 @@ private void checkReadException() { @Override public void close() { try { + if (taskContext != null) { + taskContext.close(); + } if (streamFetchTask != null) { streamFetchTask.shutdown(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java index 1352a78e0b1..15d3b6bf7ff 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java @@ -33,8 +33,6 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils; -import com.github.shyiko.mysql.binlog.BinaryLogClient; -import io.debezium.connector.mysql.MySqlConnection; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; @@ -42,8 +40,6 @@ import java.sql.SQLException; import java.util.List; -import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient; -import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection; import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive; /** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */ @@ -104,12 +100,7 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta @Override public MySqlSourceFetchTaskContext createFetchTaskContext( SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) { - final MySqlConnection jdbcConnection = - createMySqlConnection(taskSourceConfig.getDbzConfiguration()); - final BinaryLogClient binaryLogClient = - createBinaryClient(taskSourceConfig.getDbzConfiguration()); - return new MySqlSourceFetchTaskContext( - taskSourceConfig, this, jdbcConnection, binaryLogClient); + return new MySqlSourceFetchTaskContext(taskSourceConfig, this); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index b32f62f6b9d..c848c50dd98 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; @@ -62,15 +63,21 @@ import io.debezium.schema.DataCollectionId; import io.debezium.schema.TopicSelector; import io.debezium.util.Collect; +import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.sql.SQLException; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection; /** The context for fetch task that fetching data of snapshot split from MySQL data source. */ +@Slf4j public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext { private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class); @@ -89,13 +96,10 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext { private MySqlErrorHandler errorHandler; public MySqlSourceFetchTaskContext( - JdbcSourceConfig sourceConfig, - JdbcDataSourceDialect dataSourceDialect, - MySqlConnection connection, - BinaryLogClient binaryLogClient) { + JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) { super(sourceConfig, dataSourceDialect); - this.connection = connection; - this.binaryLogClient = binaryLogClient; + this.connection = createMySqlConnection(sourceConfig.getDbzConfiguration()); + this.binaryLogClient = createBinaryClient(sourceConfig.getDbzConfiguration()); this.metadataProvider = new MySqlEventMetadataProvider(); } @@ -159,6 +163,18 @@ public void configure(SourceSplitBase sourceSplitBase) { this.errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue); } + @Override + public void close() { + try { + this.connection.close(); + this.binaryLogClient.disconnect(); + } catch (SQLException e) { + log.warn("Failed to close connection, {}", ExceptionUtils.getMessage(e)); + } catch (IOException e) { + log.warn("Failed to close binaryLogClient, {}", ExceptionUtils.getMessage(e)); + } + } + @Override public MySqlSourceConfig getSourceConfig() { return (MySqlSourceConfig) sourceConfig; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java index 1f2fb09736d..b9ffe60349d 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java @@ -339,6 +339,7 @@ public static String quote(TableId tableId) { private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, int fetchSize) throws SQLException { final Connection connection = jdbc.connection(); + // Add MySQL metadata locks to prevent modification of table structure. connection.setAutoCommit(false); final PreparedStatement statement = connection.prepareStatement( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java index cdef6311950..0494cd98e1c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java @@ -34,7 +34,6 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerSchema; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.TableDiscoveryUtils; -import io.debezium.connector.sqlserver.SqlServerConnection; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; @@ -104,13 +103,8 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta @Override public SqlServerSourceFetchTaskContext createFetchTaskContext( SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) { - final SqlServerConnection jdbcConnection = - createSqlServerConnection(taskSourceConfig.getDbzConfiguration()); - final SqlServerConnection metaDataConnection = - createSqlServerConnection(taskSourceConfig.getDbzConfiguration()); - return new SqlServerSourceFetchTaskContext( - taskSourceConfig, this, jdbcConnection, metaDataConnection); + return new SqlServerSourceFetchTaskContext(taskSourceConfig, this); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java index 6dd8e7d401f..d480cb7a693 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; @@ -58,13 +59,18 @@ import io.debezium.schema.DataCollectionId; import io.debezium.schema.TopicSelector; import io.debezium.util.Collect; +import lombok.extern.slf4j.Slf4j; +import java.sql.SQLException; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; +import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection; + /** The context for fetch task that fetching data of snapshot split from MySQL data source. */ +@Slf4j public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext { private final SqlServerConnection dataConnection; @@ -83,13 +89,11 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics; public SqlServerSourceFetchTaskContext( - JdbcSourceConfig sourceConfig, - JdbcDataSourceDialect dataSourceDialect, - SqlServerConnection dataConnection, - SqlServerConnection metadataConnection) { + JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) { super(sourceConfig, dataSourceDialect); - this.dataConnection = dataConnection; - this.metadataConnection = metadataConnection; + + this.dataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration()); + this.metadataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration()); this.metadataProvider = new SqlServerEventMetadataProvider(); } @@ -158,6 +162,16 @@ public void configure(SourceSplitBase sourceSplitBase) { this.errorHandler = new SqlServerErrorHandler(connectorConfig.getLogicalName(), queue); } + @Override + public void close() { + try { + this.dataConnection.close(); + this.metadataConnection.close(); + } catch (SQLException e) { + log.warn("Failed to close connection, {}", ExceptionUtils.getMessage(e)); + } + } + @Override public SqlServerSourceConfig getSourceConfig() { return (SqlServerSourceConfig) sourceConfig; From 72eb248416e21eed1f1e1d6bfa81700b97291916 Mon Sep 17 00:00:00 2001 From: ic4y <83933160+ic4y@users.noreply.github.com> Date: Mon, 13 Mar 2023 20:22:48 +0800 Subject: [PATCH 4/7] Update seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java Co-authored-by: TaoZex <45089228+TaoZex@users.noreply.github.com> --- .../source/reader/external/IncrementalSourceStreamFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index 40a67ecfcad..f3715e710a8 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -132,7 +132,7 @@ public void close() { if (!executorService.awaitTermination( READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.warn( - "Failed to close the stream fetcher in {} seconds. will execute force close(ExecutorService.shutdownNow)", + "Failed to close the stream fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)", READER_CLOSE_TIMEOUT_SECONDS); executorService.shutdownNow(); } From 54ec1110202e357ed6aa5737da631d718aea7200 Mon Sep 17 00:00:00 2001 From: ic4y <83933160+ic4y@users.noreply.github.com> Date: Mon, 13 Mar 2023 20:22:54 +0800 Subject: [PATCH 5/7] Update seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java Co-authored-by: TaoZex <45089228+TaoZex@users.noreply.github.com> --- .../source/reader/external/IncrementalSourceScanFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java index 479a717b551..56f59ba087a 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -203,7 +203,7 @@ public void close() { if (!executorService.awaitTermination( READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.warn( - "Failed to close the scan fetcher in {} seconds. will execute force close(ExecutorService.shutdownNow)", + "Failed to close the scan fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)", READER_CLOSE_TIMEOUT_SECONDS); executorService.shutdownNow(); } From 2dbb4867fb8f752d1e127d00e9b8bfd42e05fc81 Mon Sep 17 00:00:00 2001 From: liuli Date: Tue, 14 Mar 2023 14:13:57 +0800 Subject: [PATCH 6/7] Optimize log output --- .../source/reader/fetch/MySqlSourceFetchTaskContext.java | 4 ++-- .../source/reader/fetch/SqlServerSourceFetchTaskContext.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index c848c50dd98..f4e142667f9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -169,9 +169,9 @@ public void close() { this.connection.close(); this.binaryLogClient.disconnect(); } catch (SQLException e) { - log.warn("Failed to close connection, {}", ExceptionUtils.getMessage(e)); + log.warn("Failed to close connection", e); } catch (IOException e) { - log.warn("Failed to close binaryLogClient, {}", ExceptionUtils.getMessage(e)); + log.warn("Failed to close binaryLogClient", e); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java index d480cb7a693..10a5ac6d522 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java @@ -168,7 +168,7 @@ public void close() { this.dataConnection.close(); this.metadataConnection.close(); } catch (SQLException e) { - log.warn("Failed to close connection, {}", ExceptionUtils.getMessage(e)); + log.warn("Failed to close connection", e); } } From 1de8da8121361f4b0adeb4f407afc05265d7e804 Mon Sep 17 00:00:00 2001 From: liuli Date: Tue, 14 Mar 2023 14:26:43 +0800 Subject: [PATCH 7/7] Optimize log output --- .../mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java | 1 - .../source/reader/fetch/SqlServerSourceFetchTaskContext.java | 1 - 2 files changed, 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index f4e142667f9..c6aebc8aad7 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java index 10a5ac6d522..200806f39b3 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;