Skip to content

Commit

Permalink
add close FetchTask.Context
Browse files Browse the repository at this point in the history
  • Loading branch information
ic4y committed Mar 13, 2023
1 parent ef4384b commit fc918b9
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,7 @@ interface Context {
void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord);

List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords);

void close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ private void checkReadException() {
@Override
public void close() {
try {
if (taskContext != null) {
taskContext.close();
}
if (snapshotSplitReadTask != null) {
snapshotSplitReadTask.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ private void checkReadException() {
@Override
public void close() {
try {
if (taskContext != null) {
taskContext.close();
}
if (streamFetchTask != null) {
streamFetchTask.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;

import java.sql.SQLException;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;

/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
Expand Down Expand Up @@ -104,12 +100,7 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta
@Override
public MySqlSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
final MySqlConnection jdbcConnection =
createMySqlConnection(taskSourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient =
createBinaryClient(taskSourceConfig.getDbzConfiguration());
return new MySqlSourceFetchTaskContext(
taskSourceConfig, this, jdbcConnection, binaryLogClient);
return new MySqlSourceFetchTaskContext(taskSourceConfig, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch;

import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
Expand Down Expand Up @@ -62,15 +63,21 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;

/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
@Slf4j
public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {

private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
Expand All @@ -89,13 +96,10 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private MySqlErrorHandler errorHandler;

public MySqlSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
MySqlConnection connection,
BinaryLogClient binaryLogClient) {
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
this.connection = connection;
this.binaryLogClient = binaryLogClient;
this.connection = createMySqlConnection(sourceConfig.getDbzConfiguration());
this.binaryLogClient = createBinaryClient(sourceConfig.getDbzConfiguration());
this.metadataProvider = new MySqlEventMetadataProvider();
}

Expand Down Expand Up @@ -159,6 +163,18 @@ public void configure(SourceSplitBase sourceSplitBase) {
this.errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
}

@Override
public void close() {
try {
this.connection.close();
this.binaryLogClient.disconnect();
} catch (SQLException e) {
log.warn("Failed to close connection, {}", ExceptionUtils.getMessage(e));
} catch (IOException e) {
log.warn("Failed to close binaryLogClient, {}", ExceptionUtils.getMessage(e));
}
}

@Override
public MySqlSourceConfig getSourceConfig() {
return (MySqlSourceConfig) sourceConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public static String quote(TableId tableId) {
private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, int fetchSize)
throws SQLException {
final Connection connection = jdbc.connection();
// Add MySQL metadata locks to prevent modification of table structure.
connection.setAutoCommit(false);
final PreparedStatement statement =
connection.prepareStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.TableDiscoveryUtils;

import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
Expand Down Expand Up @@ -104,13 +103,8 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta
@Override
public SqlServerSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
final SqlServerConnection jdbcConnection =
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
final SqlServerConnection metaDataConnection =
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());

return new SqlServerSourceFetchTaskContext(
taskSourceConfig, this, jdbcConnection, metaDataConnection);
return new SqlServerSourceFetchTaskContext(taskSourceConfig, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch;

import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
Expand Down Expand Up @@ -58,13 +59,18 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;

/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
@Slf4j
public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext {

private final SqlServerConnection dataConnection;
Expand All @@ -83,13 +89,11 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;

public SqlServerSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
SqlServerConnection dataConnection,
SqlServerConnection metadataConnection) {
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
this.dataConnection = dataConnection;
this.metadataConnection = metadataConnection;

this.dataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
this.metadataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
this.metadataProvider = new SqlServerEventMetadataProvider();
}

Expand Down Expand Up @@ -158,6 +162,16 @@ public void configure(SourceSplitBase sourceSplitBase) {
this.errorHandler = new SqlServerErrorHandler(connectorConfig.getLogicalName(), queue);
}

@Override
public void close() {
try {
this.dataConnection.close();
this.metadataConnection.close();
} catch (SQLException e) {
log.warn("Failed to close connection, {}", ExceptionUtils.getMessage(e));
}
}

@Override
public SqlServerSourceConfig getSourceConfig() {
return (SqlServerSourceConfig) sourceConfig;
Expand Down

0 comments on commit fc918b9

Please sign in to comment.