Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix][cdc-base] Fix cdc base shutdown thread not cleared #4327

Merged
merged 7 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public interface FetchTask<Split> {
/** Returns current task is running or not. */
boolean isRunning();

/** Close this task */
void shutdown();

/** Returns the split that the task used. */
Split getSplit();

Expand All @@ -63,5 +66,7 @@ interface Context {
void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord);

List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords);

void close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,20 @@ private void checkReadException() {
@Override
public void close() {
try {
if (taskContext != null) {
taskContext.close();
}
if (snapshotSplitReadTask != null) {
snapshotSplitReadTask.shutdown();
}
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(
if (!executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn(
"Failed to close the scan fetcher in {} seconds.",
"Failed to close the scan fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)",
READER_CLOSE_TIMEOUT_SECONDS);
executorService.shutdownNow();
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,20 @@ private void checkReadException() {
@Override
public void close() {
try {
if (taskContext != null) {
taskContext.close();
}
if (streamFetchTask != null) {
streamFetchTask.shutdown();
}
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(
if (!executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn(
"Failed to close the stream fetcher in {} seconds.",
"Failed to close the stream fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)",
READER_CLOSE_TIMEOUT_SECONDS);
executorService.shutdownNow();
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;

import java.sql.SQLException;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;

/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
Expand Down Expand Up @@ -104,12 +100,7 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta
@Override
public MySqlSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
final MySqlConnection jdbcConnection =
createMySqlConnection(taskSourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient =
createBinaryClient(taskSourceConfig.getDbzConfiguration());
return new MySqlSourceFetchTaskContext(
taskSourceConfig, this, jdbcConnection, binaryLogClient);
return new MySqlSourceFetchTaskContext(taskSourceConfig, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,21 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;

/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
@Slf4j
public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {

private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
Expand All @@ -89,13 +95,10 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private MySqlErrorHandler errorHandler;

public MySqlSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
MySqlConnection connection,
BinaryLogClient binaryLogClient) {
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
this.connection = connection;
this.binaryLogClient = binaryLogClient;
this.connection = createMySqlConnection(sourceConfig.getDbzConfiguration());
this.binaryLogClient = createBinaryClient(sourceConfig.getDbzConfiguration());
this.metadataProvider = new MySqlEventMetadataProvider();
}

Expand Down Expand Up @@ -159,6 +162,18 @@ public void configure(SourceSplitBase sourceSplitBase) {
this.errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
}

@Override
public void close() {
try {
this.connection.close();
this.binaryLogClient.disconnect();
} catch (SQLException e) {
log.warn("Failed to close connection", e);
} catch (IOException e) {
log.warn("Failed to close binaryLogClient", e);
}
}

@Override
public MySqlSourceConfig getSourceConfig() {
return (MySqlSourceConfig) sourceConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public static String quote(TableId tableId) {
private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, int fetchSize)
throws SQLException {
final Connection connection = jdbc.connection();
// Add MySQL metadata locks to prevent modification of table structure.
connection.setAutoCommit(false);
final PreparedStatement statement =
connection.prepareStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.TableDiscoveryUtils;

import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
Expand Down Expand Up @@ -104,13 +103,8 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta
@Override
public SqlServerSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
final SqlServerConnection jdbcConnection =
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
final SqlServerConnection metaDataConnection =
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());

return new SqlServerSourceFetchTaskContext(
taskSourceConfig, this, jdbcConnection, metaDataConnection);
return new SqlServerSourceFetchTaskContext(taskSourceConfig, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;

/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
@Slf4j
public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext {

private final SqlServerConnection dataConnection;
Expand All @@ -83,13 +88,11 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;

public SqlServerSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
SqlServerConnection dataConnection,
SqlServerConnection metadataConnection) {
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
this.dataConnection = dataConnection;
this.metadataConnection = metadataConnection;

this.dataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
this.metadataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
this.metadataProvider = new SqlServerEventMetadataProvider();
}

Expand Down Expand Up @@ -158,6 +161,16 @@ public void configure(SourceSplitBase sourceSplitBase) {
this.errorHandler = new SqlServerErrorHandler(connectorConfig.getLogicalName(), queue);
}

@Override
public void close() {
try {
this.dataConnection.close();
this.metadataConnection.close();
} catch (SQLException e) {
log.warn("Failed to close connection", e);
}
}

@Override
public SqlServerSourceConfig getSourceConfig() {
return (SqlServerSourceConfig) sourceConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down